> expected;
+
+ /**
+ * Initialization method of the RichGroupReduceFunction operator
+ *
+ * This runs at the initialization of the operator and receives a
+ * configuration parameter object. It initializes all required structures
+ * used by this operator such as profile managers, operations managers,
+ * topology managers etc.
+ *
+ * @param parameters
+ * A flink Configuration object
+ */
+ @Override
+ public void open(Configuration parameters) throws IOException {
+
+ this.runDate = params.getRequired("run.date");
+ // Get data from broadcast variables
+ this.mps = getRuntimeContext().getBroadcastVariable("mps");
+ this.ops = getRuntimeContext().getBroadcastVariable("ops");
+ this.egp = getRuntimeContext().getBroadcastVariable("egp");
+ this.ggp = getRuntimeContext().getBroadcastVariable("ggp");
+ this.conf = getRuntimeContext().getBroadcastVariable("conf");
+
+ // Initialize metric profile manager
+ this.mpsMgr = new MetricProfileManager();
+ this.mpsMgr.loadFromList(mps);
+ // Initialize operations manager
+ this.opsMgr = new OpsManager();
+ this.opsMgr.loadJsonString(ops);
+
+ // Initialize endpoint group manager
+ this.egpMgr = new EndpointGroupManager();
+ this.egpMgr.loadFromList(egp);
+
+ this.ggpMgr = new GroupGroupManager();
+ this.ggpMgr.loadFromList(ggp);
+
+ this.confMgr = new ConfigManager();
+ this.confMgr.loadJsonString(conf);
+
+ this.runDate = params.getRequired("run.date");
+ this.egroupType = this.confMgr.egroup;
+
+
+
+ }
+
+
+ /**
+ * Reads the topology in endpoint group list and the metric profile and produces a set of available service endpoint metrics
+ * that are expected to be found (as tuple objects (endpoint_group,service,hostname,metric)
+ **/
+ public void initExpected() {
+ this.expected = new HashSet>();
+ String mProfile = this.mpsMgr.getProfiles().get(0);
+ for (GroupEndpoint servPoint: this.egp){
+
+
+ ArrayList metrics = this.mpsMgr.getProfileServiceMetrics(mProfile, servPoint.getService());
+
+ if (metrics==null) continue;
+ for (String metric:metrics){
+ this.expected.add(new Tuple4(servPoint.getGroup(),servPoint.getService(),servPoint.getHostname(),metric));
+ }
+
+ }
+
+
+
+
+
+ }
+
+ /**
+ * Iterates over all metric data and gathers a set of encountered service endpoint metrics. Then subtracts it from
+ * a set of expected service endpoint metrics (based on topology) so as the missing service endpoint metrics to be identified. Then based on the
+ * list of the missing service endpoint metrics corresponding metric data are created
+ *
+ * @param in
+ * An Iterable collection of MetricData objects
+ * @param out
+ * A Collector list of Missing MonData objects
+ */
+ @Override
+ public void reduce(Iterable in, Collector out) throws Exception {
+
+ initExpected();
+
+ Set> found = new HashSet>();
+
+ String service = "";
+ String endpointGroup = "";
+ String hostname = "";
+ String metric = "";
+
+ String timestamp = this.runDate + "T00:00:00Z";
+ String state = this.opsMgr.getDefaultMissing();
+
+
+ for (MetricData item : in) {
+
+ service = item.getService();
+ hostname = item.getHostname();
+ metric = item.getMetric();
+
+ // Filter By endpoint group if belongs to supergroup
+ ArrayList groupnames = egpMgr.getGroup(egroupType, hostname, service);
+
+ for (String groupname : groupnames) {
+ if (ggpMgr.checkSubGroup(groupname) == true) {
+ endpointGroup = groupname;
+ found.add(new Tuple4(endpointGroup, service, hostname, metric));
+ }
+
+ }
+
+
+
+ }
+
+
+ // Clone expected set to missing (because missing is going to be mutated after subtraction
+ Set> missing = new HashSet>(this.expected);
+ // The result of the subtraction is in missing set
+ missing.removeAll(found);
+
+
+
+
+ // For each item in missing create a missing metric data entry
+ for (Tuple4 item:missing){
+ StatusMetric mn = new StatusMetric();
+ // Create a StatusMetric output
+ // Grab the timestamp to generate the date and time integer fields
+ // that are exclusively used in datastore for indexing
+ String timestamp2 = timestamp.split("Z")[0];
+ String[] tsToken = timestamp2.split("T");
+ int dateInt = Integer.parseInt(tsToken[0].replace("-", ""));
+ int timeInt = Integer.parseInt(tsToken[1].replace(":",""));
+ mn.setGroup(item.f0);
+ mn.setService(item.f1);
+ mn.setHostname(item.f2);
+ mn.setMetric(item.f3);
+ mn.setStatus(state);
+ mn.setMessage("");
+ mn.setSummary("");
+ mn.setTimestamp(timestamp);
+ mn.setDateInt(dateInt);
+ mn.setTimeInt(timeInt);
+
+ out.collect(mn);
+
+
+ }
+
+
+
+ }
+
+}
diff --git a/flink_jobs/batch_status/src/main/java/argo/batch/MongoStatusOutput.java b/flink_jobs/batch_status/src/main/java/argo/batch/MongoStatusOutput.java
index ae713355..65b52e98 100644
--- a/flink_jobs/batch_status/src/main/java/argo/batch/MongoStatusOutput.java
+++ b/flink_jobs/batch_status/src/main/java/argo/batch/MongoStatusOutput.java
@@ -121,7 +121,12 @@ private Document prepDoc(StatusMetric record) {
.append("summary", record.getSummary())
.append("time_integer",record.getTimeInt())
.append("previous_state",record.getPrevState())
- .append("previous_ts", record.getPrevTs());
+ .append("previous_timestamp", record.getPrevTs())
+ // append the actual data to status metric record in datastore
+ .append("actual_data", record.getActualData())
+ // append original status and threshold rule applied
+ .append("original_status", record.getOgStatus())
+ .append("threshold_rule_applied", record.getRuleApplied());
}
diff --git a/flink_jobs/batch_status/src/main/java/argo/batch/PickEndpoints.java b/flink_jobs/batch_status/src/main/java/argo/batch/PickEndpoints.java
index e3edd6eb..717af10a 100644
--- a/flink_jobs/batch_status/src/main/java/argo/batch/PickEndpoints.java
+++ b/flink_jobs/batch_status/src/main/java/argo/batch/PickEndpoints.java
@@ -4,6 +4,7 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -14,12 +15,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.esotericsoftware.minlog.Log;
+
import argo.avro.GroupEndpoint;
import argo.avro.GroupGroup;
import argo.avro.MetricData;
import argo.avro.MetricProfile;
import ops.ConfigManager;
import ops.OpsManager;
+import ops.ThresholdManager;
import sync.AggregationProfileManager;
import sync.EndpointGroupManager;
import sync.GroupGroupManager;
@@ -48,11 +52,17 @@ public PickEndpoints(ParameterTool params){
private List ggp;
private List rec;
private List cfg;
+ private List thr;
+ private List ops;
+ private List aps;
+ private OpsManager opsMgr;
private MetricProfileManager mpsMgr;
private EndpointGroupManager egpMgr;
private GroupGroupManager ggpMgr;
private RecomputationsManager recMgr;
private ConfigManager cfgMgr;
+ private ThresholdManager thrMgr;
+ private AggregationProfileManager apsMgr;
private String egroupType;
@@ -65,6 +75,10 @@ public void open(Configuration parameters) throws IOException, ParseException {
this.ggp = getRuntimeContext().getBroadcastVariable("ggp");
this.rec = getRuntimeContext().getBroadcastVariable("rec");
this.cfg = getRuntimeContext().getBroadcastVariable("conf");
+ this.thr = getRuntimeContext().getBroadcastVariable("thr");
+ this.ops = getRuntimeContext().getBroadcastVariable("ops");
+ this.aps = getRuntimeContext().getBroadcastVariable("aps");
+
// Initialize Recomputation manager
this.recMgr = new RecomputationsManager();
@@ -84,7 +98,23 @@ public void open(Configuration parameters) throws IOException, ParseException {
this.cfgMgr = new ConfigManager();
this.cfgMgr.loadJsonString(cfg);
+ // Initialize Ops Manager
+ this.opsMgr = new OpsManager();
+ this.opsMgr.loadJsonString(ops);
+
+ // Initialize Aggregation Profile manager
+ this.apsMgr = new AggregationProfileManager();
+ this.apsMgr.loadJsonString(aps);
+
this.egroupType = cfgMgr.egroup;
+
+ // Initialize Threshold manager
+ this.thrMgr = new ThresholdManager();
+ if (!this.thr.get(0).isEmpty()){
+ this.thrMgr.parseJSON(this.thr.get(0));
+ }
+
+
}
@@ -93,6 +123,7 @@ public void open(Configuration parameters) throws IOException, ParseException {
public void flatMap(MetricData md, Collector out) throws Exception {
String prof = mpsMgr.getProfiles().get(0);
+ String aprof = apsMgr.getAvProfiles().get(0);
String hostname = md.getHostname();
String service = md.getService();
String metric = md.getMetric();
@@ -102,10 +133,14 @@ public void flatMap(MetricData md, Collector out) throws Exception
// Filter By monitoring engine
if (recMgr.isMonExcluded(monHost, ts) == true) return;
+ // Filter By aggregation profile
+ if (apsMgr.checkService(aprof, service) == false) return;
// Filter By metric profile
if (mpsMgr.checkProfileServiceMetric(prof, service, metric) == false) return;
-
+
+
+
// Filter By endpoint group if belongs to supergroup
ArrayList groupnames = egpMgr.getGroup(egroupType, hostname, service);
@@ -117,8 +152,33 @@ public void flatMap(MetricData md, Collector out) throws Exception
String[] tsToken = timestamp2.split("T");
int dateInt = Integer.parseInt(tsToken[0].replace("-", ""));
int timeInt = Integer.parseInt(tsToken[1].replace(":",""));
+ String status = md.getStatus();
+ String actualData = md.getActualData();
+ String ogStatus = "";
+ String ruleApplied = "";
+
+ if (actualData != null) {
+ // Check for relevant rule
+ String rule = thrMgr.getMostRelevantRule(groupname, md.getHostname(), md.getMetric());
+ // if rule is indeed found
+ if (rule != ""){
+ // get the retrieved values from the actual data
+ Map values = thrMgr.getThresholdValues(actualData);
+ // calculate
+ String[] statusNext = thrMgr.getStatusByRuleAndValues(rule, this.opsMgr, "AND", values);
+ if (statusNext[0] == "") statusNext[0] = status;
+ LOG.info("{},{},{} data:({}) {} --> {}",groupname,md.getHostname(),md.getMetric(),values,status,statusNext[0]);
+ if (status != statusNext[0]) {
+ ogStatus = status;
+ ruleApplied = statusNext[1];
+ status = statusNext[0];
+ }
+ }
+
+
+ }
- StatusMetric sm = new StatusMetric(groupname,md.getService(),md.getHostname(),md.getMetric(), md.getStatus(),md.getTimestamp(),dateInt,timeInt,md.getSummary(),md.getMessage(),"","");
+ StatusMetric sm = new StatusMetric(groupname,md.getService(),md.getHostname(),md.getMetric(), status,md.getTimestamp(),dateInt,timeInt,md.getSummary(),md.getMessage(),"","",actualData, ogStatus, ruleApplied);
out.collect(sm);
}
diff --git a/flink_jobs/batch_status/src/main/java/argo/batch/StatusMetric.java b/flink_jobs/batch_status/src/main/java/argo/batch/StatusMetric.java
index acf315aa..9bf147c1 100644
--- a/flink_jobs/batch_status/src/main/java/argo/batch/StatusMetric.java
+++ b/flink_jobs/batch_status/src/main/java/argo/batch/StatusMetric.java
@@ -16,6 +16,9 @@ public class StatusMetric {
private String message;
private String prevState;
private String prevTs;
+ private String actualData;
+ private String ogStatus; // original status from moniting host
+ private String ruleApplied; // threshold rule applied - empty if not
public StatusMetric(){
this.group = "";
@@ -30,10 +33,13 @@ public StatusMetric(){
this.message = "";
this.prevState = "";
this.prevTs = "";
+ this.actualData = "";
+ this.ogStatus = "";
+ this.ruleApplied = "";
}
public StatusMetric(String group, String service, String hostname, String metric, String status, String timestamp,
- int dateInt, int timeInt, String summary, String message, String prevState, String prevTs) {
+ int dateInt, int timeInt, String summary, String message, String prevState, String prevTs, String actualData, String ogStatus, String ruleApplied) {
this.group = group;
this.service = service;
@@ -47,6 +53,9 @@ public StatusMetric(String group, String service, String hostname, String metric
this.message = message;
this.prevState = prevState;
this.prevTs = prevTs;
+ this.actualData = actualData;
+ this.ogStatus = ogStatus;
+ this.ruleApplied = ruleApplied;
}
@@ -128,10 +137,31 @@ public void setPrevTs(String prevTs) {
this.prevTs = prevTs;
}
+ public String getActualData() {
+ return actualData;
+ }
+ public void setActualData(String actualData) {
+ this.actualData = actualData;
+ }
+
+ public String getOgStatus() {
+ return ogStatus;
+ }
+ public void setOgStatus(String ogStatus) {
+ this.ogStatus = ogStatus;
+ }
+
+ public String getRuleApplied() {
+ return ruleApplied;
+ }
+ public void setRuleApplied(String ruleApplied) {
+ this.ruleApplied = ruleApplied;
+ }
+
@Override
public String toString() {
return "(" + this.group + "," + this.service + "," + this.hostname + "," + this.metric + "," + this.status + "," + this.timestamp + "," +
- this.dateInt + "," + this.timeInt + "," + this.prevState + "," + this.prevTs + ")";
+ this.dateInt + "," + this.timeInt + "," + this.prevState + "," + this.prevTs + "," + this.actualData + "," + this.ogStatus + "," + this.ruleApplied + ")";
}
}
diff --git a/flink_jobs/batch_status/src/main/java/ops/ConfigManager.java b/flink_jobs/batch_status/src/main/java/ops/ConfigManager.java
index 30af0f9d..bdc3069a 100644
--- a/flink_jobs/batch_status/src/main/java/ops/ConfigManager.java
+++ b/flink_jobs/batch_status/src/main/java/ops/ConfigManager.java
@@ -5,62 +5,59 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
-import java.util.Map.Entry;
import java.util.List;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-
-
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
+
public class ConfigManager {
private static final Logger LOG = Logger.getLogger(ConfigManager.class.getName());
public String id; // report uuid reference
- public String tenant;
public String report;
+ public String tenant;
public String egroup; // endpoint group
public String ggroup; // group of groups
- public String agroup; // alternative group
public String weight; // weight factor type
public TreeMap egroupTags;
public TreeMap ggroupTags;
public TreeMap mdataTags;
-
public ConfigManager() {
- this.tenant = null;
this.report = null;
this.id = null;
+ this.tenant = null;
this.egroup = null;
this.ggroup = null;
this.weight = null;
this.egroupTags = new TreeMap();
this.ggroupTags = new TreeMap();
this.mdataTags = new TreeMap();
-
+
}
public void clear() {
- this.id=null;
- this.tenant = null;
+ this.id = null;
this.report = null;
+ this.tenant = null;
this.egroup = null;
this.ggroup = null;
this.weight = null;
this.egroupTags.clear();
this.ggroupTags.clear();
this.mdataTags.clear();
-
- }
+ }
+
public String getReportID() {
return id;
}
@@ -73,10 +70,10 @@ public String getTenant() {
return tenant;
}
+
public String getEgroup() {
return egroup;
}
-
public void loadJson(File jsonFile) throws IOException {
// Clear data
@@ -90,30 +87,39 @@ public void loadJson(File jsonFile) throws IOException {
JsonElement jElement = jsonParser.parse(br);
JsonObject jObj = jElement.getAsJsonObject();
// Get the simple fields
- this.id = jObj.getAsJsonPrimitive("id").getAsString();
- this.tenant = jObj.getAsJsonPrimitive("tenant").getAsString();
- this.report = jObj.getAsJsonPrimitive("job").getAsString();
- this.egroup = jObj.getAsJsonPrimitive("egroup").getAsString();
- this.ggroup = jObj.getAsJsonPrimitive("ggroup").getAsString();
- this.weight = jObj.getAsJsonPrimitive("weight").getAsString();
- this.agroup = jObj.getAsJsonPrimitive("altg").getAsString();
- // Get compound fields
- JsonObject jEgroupTags = jObj.getAsJsonObject("egroup_tags");
- JsonObject jGgroupTags = jObj.getAsJsonObject("ggroup_tags");
- JsonObject jMdataTags = jObj.getAsJsonObject("mdata_tags");
- JsonObject jDataMap = jObj.getAsJsonObject("datastore_maps");
- // Iterate fields
- for (Entry item : jEgroupTags.entrySet()) {
-
- this.egroupTags.put(item.getKey(), item.getValue().getAsString());
- }
- for (Entry item : jGgroupTags.entrySet()) {
-
- this.ggroupTags.put(item.getKey(), item.getValue().getAsString());
+ this.id = jObj.get("id").getAsString();
+ this.tenant = jObj.get("tenant").getAsString();
+ this.report = jObj.get("info").getAsJsonObject().get("name").getAsString();
+
+ // get topology schema names
+ JsonObject topoGroup = jObj.get("topology_schema").getAsJsonObject().getAsJsonObject("group");
+ this.ggroup = topoGroup.get("type").getAsString();
+ this.egroup = topoGroup.get("group").getAsJsonObject().get("type").getAsString();
+
+ // optional weight filtering
+ this.weight = "";
+ if (jObj.has("weight")){
+ this.weight = jObj.get("weight").getAsString();
}
- for (Entry item : jMdataTags.entrySet()) {
-
- this.mdataTags.put(item.getKey(), item.getValue().getAsString());
+ // Get compound fields
+ JsonArray jTags = jObj.getAsJsonArray("filter_tags");
+
+ // Iterate tags
+ if (jTags != null) {
+ for (JsonElement tag : jTags) {
+ JsonObject jTag = tag.getAsJsonObject();
+ String name = jTag.get("name").getAsString();
+ String value = jTag.get("value").getAsString();
+ String ctx = jTag.get("context").getAsString();
+ if (ctx.equalsIgnoreCase("group_of_groups")){
+ this.ggroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("endpoint_groups")){
+ this.egroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("metric_data")) {
+ this.mdataTags.put(name, value);
+ }
+
+ }
}
@@ -130,6 +136,7 @@ public void loadJson(File jsonFile) throws IOException {
}
}
+
/**
* Loads Report config information from a config json string
@@ -146,30 +153,37 @@ public void loadJsonString(List confJson) throws JsonParseException {
JsonElement jElement = jsonParser.parse(confJson.get(0));
JsonObject jObj = jElement.getAsJsonObject();
// Get the simple fields
- this.id = jObj.getAsJsonPrimitive("id").getAsString();
- this.tenant = jObj.getAsJsonPrimitive("tenant").getAsString();
- this.report = jObj.getAsJsonPrimitive("job").getAsString();
- this.egroup = jObj.getAsJsonPrimitive("egroup").getAsString();
- this.ggroup = jObj.getAsJsonPrimitive("ggroup").getAsString();
- this.weight = jObj.getAsJsonPrimitive("weight").getAsString();
- this.agroup = jObj.getAsJsonPrimitive("altg").getAsString();
- // Get compound fields
- JsonObject jEgroupTags = jObj.getAsJsonObject("egroup_tags");
- JsonObject jGgroupTags = jObj.getAsJsonObject("ggroup_tags");
- JsonObject jMdataTags = jObj.getAsJsonObject("mdata_tags");
-
- // Iterate fields
- for (Entry item : jEgroupTags.entrySet()) {
-
- this.egroupTags.put(item.getKey(), item.getValue().getAsString());
- }
- for (Entry item : jGgroupTags.entrySet()) {
-
- this.ggroupTags.put(item.getKey(), item.getValue().getAsString());
+ this.id = jObj.get("id").getAsString();
+ this.tenant = jObj.get("tenant").getAsString();
+ this.report = jObj.get("info").getAsJsonObject().get("name").getAsString();
+ // get topology schema names
+ JsonObject topoGroup = jObj.get("topology_schema").getAsJsonObject().getAsJsonObject("group");
+ this.ggroup = topoGroup.get("type").getAsString();
+ this.egroup = topoGroup.get("group").getAsJsonObject().get("type").getAsString();
+ // optional weight filtering
+ this.weight = "";
+ if (jObj.has("weight")){
+ this.weight = jObj.get("weight").getAsString();
}
- for (Entry item : jMdataTags.entrySet()) {
-
- this.mdataTags.put(item.getKey(), item.getValue().getAsString());
+ // Get compound fields
+ JsonArray jTags = jObj.getAsJsonArray("tags");
+
+ // Iterate tags
+ if (jTags != null) {
+ for (JsonElement tag : jTags) {
+ JsonObject jTag = tag.getAsJsonObject();
+ String name = jTag.get("name").getAsString();
+ String value = jTag.get("value").getAsString();
+ String ctx = jTag.get("context").getAsString();
+ if (ctx.equalsIgnoreCase("group_of_groups")){
+ this.ggroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("endpoint_groups")){
+ this.egroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("metric_data")) {
+ this.mdataTags.put(name, value);
+ }
+
+ }
}
} catch (JsonParseException ex) {
diff --git a/flink_jobs/batch_status/src/main/java/ops/ThresholdManager.java b/flink_jobs/batch_status/src/main/java/ops/ThresholdManager.java
new file mode 100644
index 00000000..e69a0190
--- /dev/null
+++ b/flink_jobs/batch_status/src/main/java/ops/ThresholdManager.java
@@ -0,0 +1,754 @@
+package ops;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+
+/**
+ * @author kaggis
+ *
+ */
+public class ThresholdManager {
+
+ private static final Logger LOG = Logger.getLogger(ThresholdManager.class.getName());
+
+ // Nested map that holds rule definitions: "groups/hosts/metrics" -> label ->
+ // threshold
+ // rules"
+ private Map> rules;
+
+ // Reverse index checks for group, host, metrics
+ private HashSet metrics;
+ private HashSet hosts;
+ private HashSet groups;
+ private String aggregationOp = "AND";
+
+ public Map> getRules() {
+ return this.rules;
+ }
+
+ /**
+ * Threshold class implements objects that hold threshold values as they are
+ * parsed by a threshold expression such as the following one:
+ *
+ * label=30s;0:50,50:100,0,100
+ *
+ * A Threshold object can be directly constructed from a string including an
+ * expression as the above
+ *
+ * Each threshold object stores the threshold expression and the individual
+ * parsed items such as value, uom, warning range, critical range and min,max
+ * values
+ *
+ */
+ class Threshold {
+
+ private static final String defWarning = "WARNING";
+ private static final String defCritical = "CRITICAL";
+
+ private String expression;
+ private String label;
+ private Float value;
+ private String uom;
+ private Range warning;
+ private Range critical;
+ private Float min;
+ private Float max;
+
+
+
+ /**
+ * Constructs a threshold from a string containing a threshold expression
+ *
+ * @param expression
+ * A string containing a threshold exception as the following one:
+ * label=30s;0:50,50:100,0,100
+ *
+ */
+ public Threshold(String expression) {
+ Threshold temp = parseAndSet(expression);
+ this.expression = temp.expression;
+ this.label = temp.label;
+ this.value = temp.value;
+ this.uom = temp.uom;
+ this.warning = temp.warning;
+ this.critical = temp.critical;
+ this.min = temp.min;
+ this.max = temp.max;
+
+ }
+
+ /**
+ * Create a new threshold object by providing each parameter
+ *
+ * @param expression
+ * string containing the threshold expression
+ * @param label
+ * threshold label
+ * @param value
+ * threshold value
+ * @param uom
+ * unit of measurement - optional
+ * @param warning
+ * a range determining warning statuses
+ * @param critical
+ * a range determining critical statuses
+ * @param min
+ * minimum value available for this threshold
+ * @param max
+ * maximum value available for this threshold
+ */
+ public Threshold(String expression, String label, float value, String uom, Range warning, Range critical,
+ float min, float max) {
+
+ this.expression = expression;
+ this.label = label;
+ this.value = value;
+ this.uom = uom;
+ this.warning = warning;
+ this.critical = critical;
+ this.min = min;
+ this.max = max;
+ }
+
+ public String getExpression() {
+ return expression;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public float getValue() {
+ return value;
+ }
+
+ public String getUom() {
+ return uom;
+ }
+
+ public Range getWarning() {
+ return warning;
+ }
+
+ public Range getCritical() {
+ return critical;
+ }
+
+ public float getMin() {
+ return min;
+ }
+
+ public float getMax() {
+ return max;
+ }
+
+ /**
+ * Parses a threshold expression string and returns a Threshold object
+ *
+ * @param threshold
+ * string containing the threshold expression
+ * @return Threshold object
+ */
+ public Threshold parseAndSet(String threshold) {
+
+ String pThresh = threshold;
+ String curLabel = "";
+ String curUom = "";
+ Float curValue = Float.NaN;
+ Range curWarning = new Range(); // empty range
+ Range curCritical = new Range(); // emtpy range
+ Float curMin = Float.NaN;
+ Float curMax = Float.NaN;
+ // find label by splitting at =
+ String[] tokens = pThresh.split("=");
+ // Must have two tokens to continue, label=something
+ if (tokens.length == 2) {
+ curLabel = tokens[0];
+
+ // Split right value by ; to find the array of arguments
+ String[] subtokens = tokens[1].split(";");
+ // Must have size > 0 at least a value
+ if (subtokens.length > 0) {
+ curUom = getUOM(subtokens[0]);
+ curValue = Float.parseFloat(subtokens[0].replaceAll(curUom, ""));
+ if (subtokens.length > 1) {
+ // iterate over rest of subtokens
+ for (int i = 1; i < subtokens.length; i++) {
+ if (i == 1) {
+ // parse warning range
+ curWarning = new Range(subtokens[i]);
+ continue;
+ } else if (i == 2) {
+ // parse critical
+ curCritical = new Range(subtokens[i]);
+ continue;
+ } else if (i == 3) {
+ // parse min
+ curMin = Float.parseFloat(subtokens[i]);
+ continue;
+ } else if (i == 4) {
+ // parse min
+ curMax = Float.parseFloat(subtokens[i]);
+ }
+ }
+ }
+
+ }
+
+ }
+
+ return new Threshold(threshold, curLabel, curValue, curUom, curWarning, curCritical, curMin, curMax);
+
+ }
+
+ /**
+ * Reads a threshold string value and extracts the unit of measurement if
+ * present
+ *
+ * @param value
+ * String containing a representation of the value and uom
+ * @return String representing the uom.
+ */
+ public String getUOM(String value) {
+ // check if ends with digit
+ if (Character.isDigit(value.charAt(value.length() - 1))) {
+ return "";
+ }
+
+ // check if ends with seconds
+ if (value.endsWith("s"))
+ return "s";
+ if (value.endsWith("us"))
+ return "us";
+ if (value.endsWith("ms"))
+ return "ms";
+ if (value.endsWith("%"))
+ return "%";
+ if (value.endsWith("B"))
+ return "B";
+ if (value.endsWith("KB"))
+ return "KB";
+ if (value.endsWith("MB"))
+ return "MB";
+ if (value.endsWith("TB"))
+ return "TB";
+ if (value.endsWith("c"))
+ return "c";
+
+ // Not valid range
+ throw new RuntimeException("Invalid Unit of measurement: " + value);
+
+ }
+
+ /**
+ * Checks an external value against a threshold's warning,critical ranges. If a
+ * range contains the value (warning or critical) the corresponding status is
+ * returned as string "WARNING" or "CRITICAL". If the threshold doesn't provide
+ * the needed data to decide on status an "" is returned back.
+ *
+ * @return string with the status result "WARNING", "CRITICAL"
+ */
+ public String calcStatusWithValue(Float value) {
+
+ if (!Float.isFinite(this.value))
+ return "";
+ if (!this.warning.isUndef()) {
+ if (this.warning.contains(value))
+ return defWarning;
+ }
+ if (!this.critical.isUndef()) {
+ if (this.critical.contains(value))
+ return defCritical;
+ }
+
+ return "";
+ }
+
+ /**
+ * Checks a threshold's value against warning,critical ranges. If a range
+ * contains the value (warning or critical) the corresponding status is returned
+ * as string "WARNING" or "CRITICAL". If the threshold doesn't provide the
+ * needed data to decide on status an "" is returned back.
+ *
+ * @return string with the status result "WARNING", "CRITICAL"
+ */
+ public String calcStatus() {
+
+ if (!Float.isFinite(this.value))
+ return "";
+ if (!this.warning.isUndef()) {
+ if (this.warning.contains(this.value))
+ return defWarning;
+ }
+ if (!this.critical.isUndef()) {
+ if (this.critical.contains(this.value))
+ return defCritical;
+ }
+
+ return "";
+ }
+
+ public String toString() {
+ String strWarn = "";
+ String strCrit = "";
+ String strMin = "";
+ String strMax = "";
+
+ if (this.warning != null)
+ strWarn = this.warning.toString();
+ if (this.critical != null)
+ strCrit = this.critical.toString();
+ if (this.min != null)
+ strMin = this.min.toString();
+ if (this.max != null)
+ strMax = this.max.toString();
+
+ return "[expression=" + this.expression + ", label=" + this.label + ", value=" + this.value + ", uom="
+ + this.uom + ", warning=" + strWarn + ", critical=" + strCrit + ", min=" + strMin + ", max="
+ + strMax + ")";
+ }
+
+ }
+
+ /**
+ * Range implements a simple object that holds a threshold's critical or warning
+ * range. It includes a floor,ceil as floats and an exclude flag when a range is
+ * supposed to be used for exclusion and not inclusion. The threshold spec uses
+ * an '@' character in front of a range to define inversion(exclusion)
+ *
+ * Inclusion assumes that floor < value < ceil and not floor <= value <= ceil
+ *
+ */
+ class Range {
+ Float floor;
+ Float ceil;
+ Boolean exclude;
+
+ /**
+ * Creates an empty range. Invert is false and limits are NaN
+ */
+ public Range() {
+ this.floor = Float.NaN;
+ this.ceil = Float.NaN;
+ this.exclude = false;
+ }
+
+ /**
+ * Creates a range by parameters
+ *
+ * @param floor
+ * Float that defines the lower limit of the range
+ * @param ceil
+ * Float that defines the upper limit of the range
+ * @param exclude
+ * boolean that defines if the range is used for inclusion (true) or
+ * exlusion (false)
+ */
+ public Range(Float floor, Float ceil, Boolean exclude) {
+ this.floor = floor;
+ this.ceil = ceil;
+ this.exclude = exclude;
+ }
+
+ /**
+ * Creates a range by parsing a range expression string like the following one:
+ * '0:10'
+ *
+ * @param range
+ * string including a range expression
+ */
+ public Range(String range) {
+ Range tmp = parseAndSet(range);
+ this.floor = tmp.floor;
+ this.ceil = tmp.ceil;
+ this.exclude = tmp.exclude;
+ }
+
+ /**
+ * Checks if a Range is undefined (float,ceiling are NaN)
+ *
+ * @return boolean
+ */
+ public boolean isUndef() {
+ return this.floor == Float.NaN || this.ceil == Float.NaN;
+ }
+
+ /**
+ * Checks if a value is included in range (or truly excluded if range is an
+ * exclusion)
+ *
+ * @param value
+ * Float
+ * @return boolean
+ */
+ public boolean contains(Float value) {
+ boolean result = value > this.floor && value < this.ceil;
+ if (this.exclude) {
+ return !result;
+ }
+ return result;
+ }
+
+ /**
+ * Parses a range expression string and creates a Range object Range expressions
+ * can be in the following forms:
+ *
+ * - 10 - range starting from 0 to 10
+ * - 10: - range starting from 10 to infinity
+ * - ~:20 - range starting from negative inf. up to 20
+ * - 20:30 - range between two numbers
+ * - @20:30 - inverted range, excludes betweeen two numbers
+ *
+ *
+ * @param expression
+ * String containing a range expression
+ * @return
+ */
+ public Range parseAndSet(String expression) {
+ String parsedRange = expression;
+ Float curFloor = 0F;
+ Float curCeil = 0F;
+ boolean curInv = false;
+ if (parsedRange.replaceAll(" ", "").equals("")) {
+ return new Range();
+ }
+ // check if invert
+ if (parsedRange.startsWith("@")) {
+ curInv = true;
+ // after check remove @ from range string
+ parsedRange = parsedRange.replaceAll("^@", "");
+ }
+
+ // check if range string doesn't have separator :
+ if (!parsedRange.contains(":")) {
+ // then we are in the case of a single number like 10
+ // which defines the rule 0 --> 10 so
+ curFloor = 0F;
+ curCeil = Float.parseFloat(parsedRange);
+
+ return new Range(curFloor, curCeil, curInv);
+ }
+
+ // check if range end with separator :
+ if (parsedRange.endsWith(":")) {
+ parsedRange = parsedRange.replaceAll(":$", "");
+ // then we are in the case of a signle number like 10:
+ // which defines the rule 10 --> positive infinity
+ curFloor = Float.parseFloat(parsedRange);
+ curCeil = Float.POSITIVE_INFINITY;
+ return new Range(curFloor, curCeil, curInv);
+ }
+
+ // tokenize string without prefixes
+ String[] tokens = parsedRange.split(":");
+ if (tokens.length == 2) {
+ // check if token[0] is negative infinity ~
+ if (tokens[0].equalsIgnoreCase("~")) {
+ curFloor = Float.NEGATIVE_INFINITY;
+ } else {
+ curFloor = Float.parseFloat(tokens[0]);
+ }
+
+ curCeil = Float.parseFloat(tokens[1]);
+ return new Range(curFloor, curCeil, curInv);
+ }
+
+ // Not valid range
+ throw new RuntimeException("Invalid threshold: " + expression);
+
+ }
+
+ public String toString() {
+ return "(floor=" + this.floor + ",ceil=" + this.ceil + ",invert=" + this.exclude.toString() + ")";
+ }
+
+ }
+
+ /**
+ * Creates a Manager that parses rules files with thresholds and stores them
+ * internally as objects. A ThresholdManager can be used to automatically
+ * calculate statuses about a monitoring item (group,host,metric) based on the
+ * most relevant threshold rules stored in it.
+ */
+ public ThresholdManager() {
+
+ this.rules = new HashMap>();
+ this.hosts = new HashSet();
+ this.groups = new HashSet();
+ this.metrics = new HashSet();
+
+ }
+
+ /**
+ * Return the default operation when aggregating statuses generated from multiple threshold rules
+ * @return
+ */
+ public String getAggregationOp() {
+ return this.aggregationOp;
+ }
+
+
+ /**
+ * @param op string with the name of the operation to be used in the aggregation (AND,OR,custom one)
+ */
+ public void setAggregationOp(String op) {
+ this.aggregationOp = op;
+ }
+
+ /**
+ * Returns a status calculation for a specific rule key Each rule key is defined
+ * as follows: 'group/host/metric' and leads to a threshold rule. Group and host
+ * parts are optional as such: 'group//metric' or '/host/metric' or '//metric'
+ *
+ * @param rule
+ * string containing a rule key
+ * @param opsMgr
+ * an OpsManager Object to handle status aggregations
+ * @param opType
+ * an OpsManager operation to be used (like 'OR', 'AND')
+ * @return string with status result
+ */
+ public String getStatusByRule(String rule, OpsManager opsMgr, String opType) {
+
+ if (!rules.containsKey(rule))
+ return "";
+ String status = "";
+ Map tholds = rules.get(rule);
+ for (Entry thold : tholds.entrySet()) {
+ // first step
+ if (status == "") {
+ status = thold.getValue().calcStatus();
+ continue;
+ }
+ String statusNext = thold.getValue().calcStatus();
+ if (statusNext != "") {
+ status = opsMgr.op(opType, status, statusNext);
+ }
+ }
+ return status;
+ }
+
+ /**
+ * Returns a status calculation for a specific rule key Each rule key is defined
+ * as follows: 'group/host/metric' and leads to a threshold rule. Group and host
+ * parts are optional as such: 'group//metric' or '/host/metric' or '//metric'
+ *
+ * @param rule
+ * string containing a rule key
+ * @param opsMgr
+ * an OpsManager Object to handle status aggregations
+ * @param opType
+ * an OpsManager operation to be used (like 'OR', 'AND')
+ * @return string array with two elements. First element is the status result and second one the rule applied
+ */
+ public String[] getStatusByRuleAndValues(String rule, OpsManager opsMgr, String opType, Map values) {
+
+ if (!rules.containsKey(rule))
+ return new String[] {"",""};
+ String status = "";
+ String explain = "";
+ Map tholds = rules.get(rule);
+
+ for ( Entry value : values.entrySet()) {
+ String label = value.getKey();
+ if (tholds.containsKey(label)) {
+ Threshold th = tholds.get(label);
+ // first step
+ if (status == "") {
+
+ status = th.calcStatusWithValue(value.getValue());
+ explain = th.getExpression();
+ continue;
+ }
+
+ String statusNext = th.calcStatusWithValue(value.getValue());
+
+ if (statusNext != "") {
+ status = opsMgr.op(opType, status, statusNext);
+ explain = explain + " " + th.getExpression();
+ }
+ }
+ }
+
+
+ return new String[]{status,explain};
+
+ }
+
+ /**
+ * Gets the most relevant rule based on a monitoring item (group,host,metric)
+ * using the following precedence (specific to least specific) (group, host,
+ * metric) #1 ( , host, metric) #2 (group, , metric) #3 ( , , metric) #4
+ *
+ * @param group
+ * string with name of the monitored endpoint group
+ * @param host
+ * string with name of the monitored host
+ * @param metric
+ * string with name of the monitored metric
+ * @return a string with the relevant rule key
+ */
+ public String getMostRelevantRule(String group, String host, String metric) {
+ if (!this.metrics.contains(metric)) {
+ return ""; // nothing found
+ } else {
+
+ // order or precedence: more specific first
+ // group,host,metric #1
+ // ,host,metric #2
+ // group ,metric #3
+ // ,metric #4
+ if (this.hosts.contains(host)) {
+ if (this.groups.contains(group)) {
+ // check if combined entry indeed exists
+ String key = String.format("%s/%s/%s", group, host, metric);
+ if (this.rules.containsKey(key))
+ return key; // #1
+
+ } else {
+ return String.format("/%s/%s", host, metric); // #2
+ }
+ }
+
+ if (this.groups.contains(group)) {
+ // check if combined entry indeed exists
+ String key = String.format("%s//%s", group, metric); // #3
+ if (this.rules.containsKey(key))
+ return key;
+ }
+
+ return String.format("//%s", metric);
+ }
+
+ }
+
+ /**
+ * Parses an expression that might contain multiple labels=thresholds separated
+ * by whitespace and creates a HashMap of labels to parsed threshold objects
+ *
+ * @param thresholds
+ * an expression that might contain multiple thresholds
+ * @return a HashMap to Threshold objects
+ */
+ public Map parseThresholds(String thresholds) {
+ Map subMap = new HashMap();
+ // Tokenize with lookahead on the point when a new label starts
+ String[] tokens = thresholds.split("(;|[ ]+)(?=[a-zA-Z])");
+ for (String token : tokens) {
+ Threshold curTh = new Threshold(token);
+ if (curTh != null) {
+ subMap.put(curTh.getLabel(), curTh);
+ }
+ }
+ return subMap;
+ }
+
+ /**
+ * Parses an expression that might contain multiple labels=thresholds separated
+ * by whitespace and creates a HashMap of labels to parsed Float values
+ *
+ * @param thresholds
+ * an expression that might contain multiple thresholds
+ * @return a HashMap to Floats
+ */
+ public Map getThresholdValues(String thresholds) {
+ Map subMap = new HashMap();
+ // tokenize thresholds by whitespace
+ String[] tokens = thresholds.split("(;|[ ]+)(?=[a-zA-Z])");
+ for (String token : tokens) {
+ Threshold curTh = new Threshold(token);
+ if (curTh != null) {
+ subMap.put(curTh.getLabel(), curTh.getValue());
+ }
+ }
+ return subMap;
+ }
+
+ /**
+ * Parses a JSON threshold rule file and populates the ThresholdManager
+ *
+ * @param jsonFile
+ * File to be parsed
+ * @return boolean signaling whether operation succeeded or not
+ */
+ public boolean parseJSONFile(File jsonFile) {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(jsonFile));
+ String jsonStr = IOUtils.toString(br);
+ if (!parseJSON(jsonStr))
+ return false;
+
+ } catch (IOException ex) {
+ LOG.error("Could not open file:" + jsonFile.getName());
+ return false;
+
+ } catch (JsonParseException ex) {
+ LOG.error("File is not valid json:" + jsonFile.getName());
+ return false;
+ } finally {
+ // Close quietly without exceptions the buffered reader
+ IOUtils.closeQuietly(br);
+ }
+
+ return true;
+
+ }
+
+ /**
+ * Parses a json string with the appropriate threshold rule schema and populates
+ * the ThresholdManager
+ *
+ * @param jsonString
+ * string containing threshold rules in json format
+ * @return boolean signaling whether the parse information succeded or not
+ */
+ public boolean parseJSON(String jsonString) {
+
+
+ JsonParser json_parser = new JsonParser();
+ JsonObject jRoot = json_parser.parse(jsonString).getAsJsonObject();
+ JsonArray jRules = jRoot.getAsJsonArray("rules");
+ for (JsonElement jRule : jRules) {
+ JsonObject jRuleObj = jRule.getAsJsonObject();
+ String ruleMetric = jRuleObj.getAsJsonPrimitive("metric").getAsString();
+ String ruleHost = "";
+ String ruleEgroup = "";
+
+ if (jRuleObj.has("host")) {
+ ruleHost = jRuleObj.getAsJsonPrimitive("host").getAsString();
+ }
+ if (jRuleObj.has("endpoint_group")) {
+ ruleEgroup = jRuleObj.getAsJsonPrimitive("endpoint_group").getAsString();
+ }
+
+ String ruleThr = jRuleObj.getAsJsonPrimitive("thresholds").getAsString();
+ this.metrics.add(ruleMetric);
+ if (ruleHost != "")
+ this.hosts.add(ruleHost);
+ if (ruleEgroup != "")
+ this.groups.add(ruleEgroup);
+ String full = ruleEgroup + "/" + ruleHost + "/" + ruleMetric;
+ Map thrMap = parseThresholds(ruleThr);
+ this.rules.put(full, thrMap);
+ }
+
+ return true;
+ }
+
+}
diff --git a/flink_jobs/batch_status/src/main/java/sync/AggregationProfileManager.java b/flink_jobs/batch_status/src/main/java/sync/AggregationProfileManager.java
index 6c526407..11648956 100644
--- a/flink_jobs/batch_status/src/main/java/sync/AggregationProfileManager.java
+++ b/flink_jobs/batch_status/src/main/java/sync/AggregationProfileManager.java
@@ -16,6 +16,7 @@
import java.util.List;
import java.util.Map.Entry;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
@@ -235,28 +236,31 @@ public void loadJson(File jsonFile) throws IOException {
JsonElement jRootElement = jsonParser.parse(br);
JsonObject jRootObj = jRootElement.getAsJsonObject();
- JsonObject apGroups = jRootObj.getAsJsonObject("groups");
+ JsonArray apGroups = jRootObj.getAsJsonArray("groups");
// Create new entry for this availability profile
AvProfileItem tmpAvp = new AvProfileItem();
tmpAvp.name = jRootObj.get("name").getAsString();
tmpAvp.namespace = jRootObj.get("namespace").getAsString();
- tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsString();
- tmpAvp.metricOp = jRootObj.get("metric_ops").getAsString();
- tmpAvp.groupType = jRootObj.get("group_type").getAsString();
- tmpAvp.op = jRootObj.get("operation").getAsString();
+ tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsJsonObject().get("name").getAsString();
+ tmpAvp.metricOp = jRootObj.get("metric_operation").getAsString();
+ tmpAvp.groupType = jRootObj.get("endpoint_group").getAsString();
+ tmpAvp.op = jRootObj.get("profile_operation").getAsString();
- for (Entry item : apGroups.entrySet()) {
+ for ( JsonElement item : apGroups) {
// service name
- String itemName = item.getKey();
- JsonObject itemObj = item.getValue().getAsJsonObject();
+ JsonObject itemObj = item.getAsJsonObject();
+ String itemName = itemObj.get("name").getAsString();
String itemOp = itemObj.get("operation").getAsString();
- JsonObject itemServices = itemObj.get("services").getAsJsonObject();
+ JsonArray itemServices = itemObj.get("services").getAsJsonArray();
tmpAvp.insertGroup(itemName, itemOp);
- for (Entry subItem : itemServices.entrySet()) {
- tmpAvp.insertService(itemName, subItem.getKey(), subItem.getValue().getAsString());
+ for (JsonElement subItem : itemServices) {
+ JsonObject subObj = subItem.getAsJsonObject();
+ String serviceName = subObj.get("name").getAsString();
+ String serviceOp = subObj.get("operation").getAsString();
+ tmpAvp.insertService(itemName, serviceName,serviceOp);
}
}
@@ -289,28 +293,32 @@ public void loadJsonString(List apsJson) throws IOException {
JsonElement jRootElement = jsonParser.parse(apsJson.get(0));
JsonObject jRootObj = jRootElement.getAsJsonObject();
- JsonObject apGroups = jRootObj.getAsJsonObject("groups");
// Create new entry for this availability profile
AvProfileItem tmpAvp = new AvProfileItem();
+ JsonArray apGroups = jRootObj.getAsJsonArray("groups");
+
tmpAvp.name = jRootObj.get("name").getAsString();
tmpAvp.namespace = jRootObj.get("namespace").getAsString();
- tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsString();
- tmpAvp.metricOp = jRootObj.get("metric_ops").getAsString();
- tmpAvp.groupType = jRootObj.get("group_type").getAsString();
- tmpAvp.op = jRootObj.get("operation").getAsString();
+ tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsJsonObject().get("name").getAsString();
+ tmpAvp.metricOp = jRootObj.get("metric_operation").getAsString();
+ tmpAvp.groupType = jRootObj.get("endpoint_group").getAsString();
+ tmpAvp.op = jRootObj.get("profile_operation").getAsString();
- for (Entry item : apGroups.entrySet()) {
+ for ( JsonElement item : apGroups) {
// service name
- String itemName = item.getKey();
- JsonObject itemObj = item.getValue().getAsJsonObject();
+ JsonObject itemObj = item.getAsJsonObject();
+ String itemName = itemObj.get("name").getAsString();
String itemOp = itemObj.get("operation").getAsString();
- JsonObject itemServices = itemObj.get("services").getAsJsonObject();
+ JsonArray itemServices = itemObj.get("services").getAsJsonArray();
tmpAvp.insertGroup(itemName, itemOp);
- for (Entry subItem : itemServices.entrySet()) {
- tmpAvp.insertService(itemName, subItem.getKey(), subItem.getValue().getAsString());
+ for (JsonElement subItem : itemServices) {
+ JsonObject subObj = subItem.getAsJsonObject();
+ String serviceName = subObj.get("name").getAsString();
+ String serviceOp = subObj.get("operation").getAsString();
+ tmpAvp.insertService(itemName, serviceName,serviceOp);
}
}
diff --git a/flink_jobs/batch_status/src/main/resources/ops/EGI-algorithm.json b/flink_jobs/batch_status/src/main/resources/ops/EGI-algorithm.json
new file mode 100644
index 00000000..b88d8c99
--- /dev/null
+++ b/flink_jobs/batch_status/src/main/resources/ops/EGI-algorithm.json
@@ -0,0 +1,239 @@
+{
+ "id": "1b0318f0-429d-44fc-8bba-07184354c73b",
+ "name": "egi_ops",
+ "available_states": [
+ "OK",
+ "WARNING",
+ "UNKNOWN",
+ "MISSING",
+ "CRITICAL",
+ "DOWNTIME"
+ ],
+ "defaults": {
+ "down": "DOWNTIME",
+ "missing": "MISSING",
+ "unknown": "UNKNOWN"
+ },
+ "operations": [
+ {
+ "name": "AND",
+ "truth_table": [
+ {
+ "a": "OK",
+ "b": "OK",
+ "x": "OK"
+ },
+ {
+ "a": "OK",
+ "b": "WARNING",
+ "x": "WARNING"
+ },
+ {
+ "a": "OK",
+ "b": "UNKNOWN",
+ "x": "UNKNOWN"
+ },
+ {
+ "a": "OK",
+ "b": "MISSING",
+ "x": "MISSING"
+ },
+ {
+ "a": "OK",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "OK",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ },
+ {
+ "a": "WARNING",
+ "b": "WARNING",
+ "x": "WARNING"
+ },
+ {
+ "a": "WARNING",
+ "b": "UNKNOWN",
+ "x": "UNKNOWN"
+ },
+ {
+ "a": "WARNING",
+ "b": "MISSING",
+ "x": "MISSING"
+ },
+ {
+ "a": "WARNING",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "WARNING",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "UNKNOWN",
+ "x": "UNKNOWN"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "MISSING",
+ "x": "MISSING"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ },
+ {
+ "a": "MISSING",
+ "b": "MISSING",
+ "x": "MISSING"
+ },
+ {
+ "a": "MISSING",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "MISSING",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ },
+ {
+ "a": "CRITICAL",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "CRITICAL",
+ "b": "DOWNTIME",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "DOWNTIME",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ }
+ ]
+ },
+ {
+ "name": "OR",
+ "truth_table": [
+ {
+ "a": "OK",
+ "b": "OK",
+ "x": "OK"
+ },
+ {
+ "a": "OK",
+ "b": "WARNING",
+ "x": "OK"
+ },
+ {
+ "a": "OK",
+ "b": "UNKNOWN",
+ "x": "OK"
+ },
+ {
+ "a": "OK",
+ "b": "MISSING",
+ "x": "OK"
+ },
+ {
+ "a": "OK",
+ "b": "CRITICAL",
+ "x": "OK"
+ },
+ {
+ "a": "OK",
+ "b": "DOWNTIME",
+ "x": "OK"
+ },
+ {
+ "a": "WARNING",
+ "b": "WARNING",
+ "x": "WARNING"
+ },
+ {
+ "a": "WARNING",
+ "b": "UNKNOWN",
+ "x": "WARNING"
+ },
+ {
+ "a": "WARNING",
+ "b": "MISSING",
+ "x": "WARNING"
+ },
+ {
+ "a": "WARNING",
+ "b": "CRITICAL",
+ "x": "WARNING"
+ },
+ {
+ "a": "WARNING",
+ "b": "DOWNTIME",
+ "x": "WARNING"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "UNKNOWN",
+ "x": "UNKNOWN"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "MISSING",
+ "x": "UNKNOWN"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "UNKNOWN",
+ "b": "DOWNTIME",
+ "x": "UNKNOWN"
+ },
+ {
+ "a": "MISSING",
+ "b": "MISSING",
+ "x": "MISSING"
+ },
+ {
+ "a": "MISSING",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "MISSING",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ },
+ {
+ "a": "CRITICAL",
+ "b": "CRITICAL",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "CRITICAL",
+ "b": "DOWNTIME",
+ "x": "CRITICAL"
+ },
+ {
+ "a": "DOWNTIME",
+ "b": "DOWNTIME",
+ "x": "DOWNTIME"
+ }
+ ]
+ }
+ ]
+}
diff --git a/flink_jobs/batch_status/src/main/resources/ops/EGI-rules.json b/flink_jobs/batch_status/src/main/resources/ops/EGI-rules.json
new file mode 100644
index 00000000..2a52c99a
--- /dev/null
+++ b/flink_jobs/batch_status/src/main/resources/ops/EGI-rules.json
@@ -0,0 +1,33 @@
+{
+ "rules": [
+ {
+ "metric": "org.bdii.Freshness",
+ "thresholds": "freshness=10s;30;50:60;0;100 entries=5;0:10;20:30;50;30"
+ },
+ {
+ "metric": "org.bdii.Entries",
+ "thresholds": "time=-35s;~:10;15:;-100;300 entries=55;20;50:60;50;30"
+ },
+ {
+ "metric": "org.bdii.Freshness",
+ "thresholds": "freshness=10s; entries=29;;30:50",
+ "host" : "bdii.host3.example.foo"
+ },
+ {
+ "metric": "org.bdii.Freshness",
+ "thresholds": "freshness=10s;30;50:60;0;100 entries=29;0:10;20:30;0;30",
+ "host" : "bdii.host1.example.foo"
+ },
+ {
+ "metric": "org.bdii.Freshness",
+ "thresholds": "freshness=10s;30;50:60;0;100 entries=5;0:10;20:30;50;30",
+ "host" : "bdii.host1.example.foo",
+ "endpoint_group": "SITE-101"
+ },
+ {
+ "metric": "org.bdii.Freshness",
+ "thresholds": "freshness=10s;30;50:60;0;100 entries=5;0:10;20:30;50;30",
+ "endpoint_group": "SITE-101"
+ }
+ ]
+}
diff --git a/flink_jobs/batch_status/src/test/java/ops/ThresholdManagerTest.java b/flink_jobs/batch_status/src/test/java/ops/ThresholdManagerTest.java
new file mode 100644
index 00000000..b2250b33
--- /dev/null
+++ b/flink_jobs/batch_status/src/test/java/ops/ThresholdManagerTest.java
@@ -0,0 +1,91 @@
+package ops;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThresholdManagerTest {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Assert that files are present
+ assertNotNull("Test file missing", ThresholdManagerTest.class.getResource("/ops/EGI-algorithm.json"));
+ assertNotNull("Test file missing", ThresholdManagerTest.class.getResource("/ops/EGI-rules.json"));
+ }
+
+ @Test
+ public void test() throws IOException, URISyntaxException {
+
+ // Prepare Resource File
+ URL opsJsonFile = ThresholdManagerTest.class.getResource("/ops/EGI-algorithm.json");
+ File opsFile = new File(opsJsonFile.toURI());
+ // Instantiate class
+ OpsManager opsMgr = new OpsManager();
+ // Test loading file
+ opsMgr.loadJson(opsFile);
+
+ // Prepare Resource File
+ URL thrJsonFile = ThresholdManagerTest.class.getResource("/ops/EGI-rules.json");
+ File thrFile = new File(thrJsonFile.toURI());
+ // Instantiate class
+ ThresholdManager t = new ThresholdManager();
+ t.parseJSONFile(thrFile);
+
+ String[] expectedRules = new String[] { "//org.bdii.Freshness", "//org.bdii.Entries",
+ "/bdii.host1.example.foo/org.bdii.Freshness", "/bdii.host3.example.foo/org.bdii.Freshness",
+ "SITE-101/bdii.host1.example.foo/org.bdii.Freshness", "SITE-101//org.bdii.Freshness" };
+
+ assertEquals(expectedRules.length, t.getRules().entrySet().size());
+
+ for (String rule : expectedRules) {
+ assertEquals(true, t.getRules().keySet().contains(rule));
+ }
+
+ assertEquals("SITE-101/bdii.host1.example.foo/org.bdii.Freshness",
+ t.getMostRelevantRule("SITE-101", "bdii.host1.example.foo", "org.bdii.Freshness"));
+
+ assertEquals("SITE-101//org.bdii.Freshness",
+ t.getMostRelevantRule("SITE-101", "bdii.host2.example.foo", "org.bdii.Freshness"));
+
+ assertEquals("//org.bdii.Freshness",
+ t.getMostRelevantRule("SITE-202", "bdii.host2.example.foo", "org.bdii.Freshness"));
+
+ assertEquals("//org.bdii.Freshness",
+ t.getMostRelevantRule("SITE-202", "bdii.host2.example.foo", "org.bdii.Freshness"));
+
+ assertEquals("//org.bdii.Entries",
+ t.getMostRelevantRule("SITE-101", "bdii.host1.example.foo", "org.bdii.Entries"));
+
+ assertEquals("", t.getMostRelevantRule("SITE-101", "bdii.host1.example.foo", "org.bdii.Foo"));
+
+ assertEquals("WARNING", t.getStatusByRule("SITE-101/bdii.host1.example.foo/org.bdii.Freshness", opsMgr, "AND"));
+ assertEquals("CRITICAL", t.getStatusByRule("//org.bdii.Entries", opsMgr, "AND"));
+ assertEquals("WARNING", t.getStatusByRule("//org.bdii.Entries", opsMgr, "OR"));
+ assertEquals("CRITICAL", t.getStatusByRule("/bdii.host1.example.foo/org.bdii.Freshness", opsMgr, "AND"));
+ assertEquals("WARNING", t.getStatusByRule("/bdii.host1.example.foo/org.bdii.Freshness", opsMgr, "OR"));
+ assertEquals("",t.getStatusByRule("/bdii.host3.example.foo/org.bdii.Freshness", opsMgr, "AND")); //no critical or warning ranges defined
+
+ // Test parsing of label=value lists including space separation or not
+ assertEquals("{size=6754.0, time=3.714648}",t.getThresholdValues("time=3.714648s;;;0.000000 size=6754B;;;0").toString());
+ assertEquals("{time=0.037908}",t.getThresholdValues("time=0.037908s;;;0.000000;120.000000").toString());
+ assertEquals("{time=0.041992}",t.getThresholdValues("time=0.041992s;;;0.000000;120.000000").toString());
+ assertEquals("{entries=1.0, time=0.15}",t.getThresholdValues("time=0.15s;entries=1").toString());
+ assertEquals("{entries=1.0, freshness=111.0}",t.getThresholdValues("freshness=111s;entries=1").toString());
+ assertEquals("{entries=1.0, freshness=111.0}",t.getThresholdValues("freshness=111s; entries=1").toString());
+ assertEquals("{entries=1.0, freshness=111.0}",t.getThresholdValues("freshness=111s;;;entries=1").toString());
+ assertEquals("{entries=1.0, freshness=111.0}",t.getThresholdValues("freshness=111s;; entries=1").toString());
+ assertEquals("{TSSInstances=1.0}",t.getThresholdValues("TSSInstances=1").toString());
+
+ String thBig = "tls_ciphers=105.47s dir_head=0.69s dir_get=0.89s file_put=0.82s file_get=0.45s file_options=0.39s file_move=0.42s file_head=0.40s file_head_on_non_existent=0.38s file_propfind=0.40s file_delete=0.72s file_delete_on_non_existent=0.37s";
+ String expThBig = "{file_head_on_non_existent=0.38, file_put=0.82, file_delete_on_non_existent=0.37, file_delete=0.72, dir_head=0.69, file_head=0.4, file_propfind=0.4, dir_get=0.89, file_move=0.42, file_options=0.39, file_get=0.45, tls_ciphers=105.47}";
+
+ assertEquals(expThBig,t.getThresholdValues(thBig).toString());
+ }
+
+}
diff --git a/flink_jobs/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java b/flink_jobs/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java
index be912615..69199169 100644
--- a/flink_jobs/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java
+++ b/flink_jobs/stream_status/src/main/java/argo/streaming/AmsStreamStatus.java
@@ -64,6 +64,8 @@
* --sync.aps : availability profile used
* --sync.ops : operations profile used
* --sync.downtimes : initial downtime file (same for run date)
+ * --report : report name
+ * --report.uuid : report uuid
* Job optional cli parameters:
* --ams.batch : num of messages to be retrieved per request to AMS service
* --ams.interval : interval (in ms) between AMS service requests
@@ -160,6 +162,7 @@ public static void main(String[] args) throws Exception {
String project = parameterTool.getRequired("ams.project");
String subMetric = parameterTool.getRequired("ams.sub.metric");
String subSync = parameterTool.getRequired("ams.sub.sync");
+
// set ams client batch and interval to default values
int batch = 1;
@@ -206,7 +209,7 @@ public static void main(String[] args) throws Exception {
DataStream events = groupMdata.connect(syncB).flatMap(new StatusMap(conf));
- events.print();
+
if (hasKafkaArgs(parameterTool)) {
// Initialize kafka parameters
@@ -236,7 +239,7 @@ public static void main(String[] args) throws Exception {
MongoStatusOutput mongoOut = new MongoStatusOutput(parameterTool.get("mongo.uri"), "status_metrics",
"status_endpoints", "status_services", "status_endpoint_groups", parameterTool.get("mongo.method"),
- parameterTool.get("report"));
+ parameterTool.get("report.uuid"));
events.writeUsingOutputFormat(mongoOut);
}
@@ -421,6 +424,8 @@ private static class StatusMap extends RichCoFlatMapFunction downList = sd.readDowntime(config.downtime);
@@ -451,6 +456,7 @@ public void open(Configuration parameters) throws IOException, ParseException, U
// create a new status manager
sm = new StatusManager();
sm.setTimeout(config.timeout);
+ sm.setReport(config.report);
// load all the connector data
sm.loadAll(config.runDate, downList, egpListFull, mpsList, apsJSON, opsJSON);
@@ -524,10 +530,13 @@ public void flatMap2(String value, Collector out) throws IOException, Pa
// Decode from base64
byte[] decoded64 = Base64.decodeBase64(data.getBytes("UTF-8"));
JsonElement jAttr = jRoot.getAsJsonObject().get("attributes");
+
Map attr = SyncParse.parseAttributes(jAttr);
- if (attr.containsKey("type")) {
-
+ // The sync dataset should have a type and report attribute and report should be the job's report
+ if (attr.containsKey("type") && attr.containsKey("report") && attr.get("report") == config.report ) {
+
String sType = attr.get("type");
+ LOG.info("Accepted " + sType + " for report: " + attr.get("report"));
if (sType.equalsIgnoreCase("metric_profile")) {
// Update mps
ArrayList mpsList = SyncParse.parseMetricProfile(decoded64);
@@ -547,7 +556,6 @@ public void flatMap2(String value, Collector out) throws IOException, Pa
egpTrim.add(egpItem);
}
}
-
sm.egp = new EndpointGroupManagerV2();
sm.egp.loadFromList(egpTrim);
} else if (sType.equals("downtimes") && attr.containsKey("partition_date")) {
@@ -556,6 +564,8 @@ public void flatMap2(String value, Collector out) throws IOException, Pa
// Update downtime cache in status manager
sm.addDowntimeSet(pDate, downList);
}
+ } else {
+ LOG.info("Declined " + attr.get("type") + "for report: " + attr.get("report"));
}
}
diff --git a/flink_jobs/stream_status/src/main/java/argo/streaming/ArgoMessagingClient.java b/flink_jobs/stream_status/src/main/java/argo/streaming/ArgoMessagingClient.java
index dcf1d2b5..4e6e1527 100644
--- a/flink_jobs/stream_status/src/main/java/argo/streaming/ArgoMessagingClient.java
+++ b/flink_jobs/stream_status/src/main/java/argo/streaming/ArgoMessagingClient.java
@@ -32,7 +32,8 @@
import org.apache.http.client.methods.CloseableHttpResponse;
/**
- * Simple http client for pulling and acknowledging messages from AMS service http API
+ * Simple http client for pulling and acknowledging messages from AMS service
+ * http API
*/
public class ArgoMessagingClient {
@@ -53,9 +54,9 @@ public class ArgoMessagingClient {
private String maxMessages = "";
// ssl verify or not
private boolean verify = true;
- // proxy
+ // proxy
private URI proxy = null;
-
+
// Utility inner class for holding list of messages and acknowledgements
private class MsgAck {
String[] msgs;
@@ -78,8 +79,9 @@ public ArgoMessagingClient() {
this.maxMessages = "100";
this.proxy = null;
}
-
- public ArgoMessagingClient(String method, String token, String endpoint, String project, String sub, int batch, boolean verify) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+
+ public ArgoMessagingClient(String method, String token, String endpoint, String project, String sub, int batch,
+ boolean verify) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
this.proto = method;
this.token = token;
@@ -88,36 +90,36 @@ public ArgoMessagingClient(String method, String token, String endpoint, String
this.sub = sub;
this.maxMessages = String.valueOf(batch);
this.verify = verify;
-
+
this.httpClient = buildHttpClient();
-
+
}
-
+
/**
* Initializes Http Client (if not initialized during constructor)
- * @return
+ *
+ * @return
*/
- private CloseableHttpClient buildHttpClient() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+ private CloseableHttpClient buildHttpClient()
+ throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
if (this.verify) {
return this.httpClient = HttpClients.createDefault();
} else {
return this.httpClient = HttpClients.custom().setSSLSocketFactory(selfSignedSSLF()).build();
}
}
-
+
/**
- * Create an SSL Connection Socket Factory with a strategy to trust self signed certificates
+ * Create an SSL Connection Socket Factory with a strategy to trust self signed
+ * certificates
*/
- private SSLConnectionSocketFactory selfSignedSSLF() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+ private SSLConnectionSocketFactory selfSignedSSLF()
+ throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContextBuilder sslBuild = new SSLContextBuilder();
- sslBuild.loadTrustMaterial(null, new TrustSelfSignedStrategy());
-
- return new SSLConnectionSocketFactory(sslBuild.build(),NoopHostnameVerifier.INSTANCE);
-
-
+ sslBuild.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+ return new SSLConnectionSocketFactory(sslBuild.build(), NoopHostnameVerifier.INSTANCE);
}
-
-
+
/**
* Set AMS http client to use http proxy
*/
@@ -125,26 +127,37 @@ public void setProxy(String proxyURL) throws URISyntaxException {
// parse proxy url
this.proxy = URI.create(proxyURL);
}
-
+
/**
* Set AMS http client to NOT use an http proxy
*/
public void unsetProxy() {
- this.proxy=null;
+ this.proxy = null;
}
-
-
/**
* Create a configuration for using http proxy on each request
*/
private RequestConfig createProxyCfg() {
- HttpHost proxy = new HttpHost(this.proxy.getHost(),this.proxy.getPort(),this.proxy.getScheme());
+ HttpHost proxy = new HttpHost(this.proxy.getHost(), this.proxy.getPort(), this.proxy.getScheme());
RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
return config;
}
+ public void logIssue(CloseableHttpResponse resp) throws UnsupportedOperationException, IOException {
+ InputStreamReader isRdr = new InputStreamReader(resp.getEntity().getContent());
+ BufferedReader bRdr = new BufferedReader(isRdr);
+ int statusCode = resp.getStatusLine().getStatusCode();
+ // Parse error content from api response
+ StringBuilder result = new StringBuilder();
+ String rLine;
+ while ((rLine = bRdr.readLine()) != null)
+ result.append(rLine);
+ isRdr.close();
+ Log.warn("ApiStatusCode={}, ApiErrorMessage={}", statusCode, result);
+
+ }
/**
* Properly compose url for each AMS request
@@ -173,11 +186,11 @@ public MsgAck doPull() throws IOException, KeyManagementException, NoSuchAlgorit
this.httpClient = buildHttpClient();
}
- // check for proxy
+ // check for proxy
if (this.proxy != null) {
postPull.setConfig(createProxyCfg());
}
-
+
CloseableHttpResponse response = this.httpClient.execute(postPull);
String msg = "";
String ackId = "";
@@ -185,7 +198,9 @@ public MsgAck doPull() throws IOException, KeyManagementException, NoSuchAlgorit
HttpEntity entity = response.getEntity();
- if (entity != null) {
+ int statusCode = response.getStatusLine().getStatusCode();
+
+ if (entity != null && statusCode == 200) {
InputStreamReader isRdr = new InputStreamReader(entity.getContent());
BufferedReader bRdr = new BufferedReader(isRdr);
@@ -198,13 +213,11 @@ public MsgAck doPull() throws IOException, KeyManagementException, NoSuchAlgorit
// Gather message from json
JsonParser jsonParser = new JsonParser();
// parse the json root object
-
+ Log.info("response: {}", result.toString());
JsonElement jRoot = jsonParser.parse(result.toString());
JsonArray jRec = jRoot.getAsJsonObject().get("receivedMessages").getAsJsonArray();
-
-
// if has elements
for (JsonElement jMsgItem : jRec) {
JsonElement jMsg = jMsgItem.getAsJsonObject().get("message");
@@ -214,9 +227,13 @@ public MsgAck doPull() throws IOException, KeyManagementException, NoSuchAlgorit
msgList.add(msg);
ackIdList.add(ackId);
}
-
+
isRdr.close();
+ } else {
+
+ logIssue(response);
+
}
response.close();
@@ -224,8 +241,6 @@ public MsgAck doPull() throws IOException, KeyManagementException, NoSuchAlgorit
String[] msgArr = msgList.toArray(new String[0]);
String[] ackIdArr = ackIdList.toArray(new String[0]);
-
-
// Return a Message array
return new MsgAck(msgArr, ackIdArr);
@@ -260,7 +275,6 @@ public String[] consume() throws KeyManagementException, NoSuchAlgorithmExceptio
} catch (IOException e) {
LOG.error(e.getMessage());
}
-
return msgs;
}
@@ -275,8 +289,8 @@ public String doAck(String ackId) throws IOException {
StringEntity postBody = new StringEntity("{\"ackIds\":[" + ackId + "]}");
postBody.setContentType("application/json");
postAck.setEntity(postBody);
-
- // check for proxy
+
+ // check for proxy
if (this.proxy != null) {
postAck.setConfig(createProxyCfg());
}
@@ -301,10 +315,11 @@ public String doAck(String ackId) throws IOException {
resMsg = result.toString();
isRdr.close();
+ } else {
+ // Log any api errors
+ logIssue(response);
}
-
response.close();
-
// Return a resposeMessage
return resMsg;
diff --git a/flink_jobs/stream_status/src/main/java/argo/streaming/StatusConfig.java b/flink_jobs/stream_status/src/main/java/argo/streaming/StatusConfig.java
index 94a673f9..9d423c4b 100644
--- a/flink_jobs/stream_status/src/main/java/argo/streaming/StatusConfig.java
+++ b/flink_jobs/stream_status/src/main/java/argo/streaming/StatusConfig.java
@@ -22,6 +22,8 @@ public class StatusConfig implements Serializable {
// Avro schema
public String avroSchema;
+ public String report;
+
// Sync files
public String aps;
public String mps;
@@ -50,6 +52,7 @@ public StatusConfig(ParameterTool pt){
this.ops = pt.getRequired("sync.ops");
this.runDate = pt.getRequired("run.date");
this.downtime = pt.getRequired("sync.downtime");
+ this.report = pt.getRequired("report");
// Optional timeout parameter
if (pt.has("timeout")){
this.timeout = pt.getLong("timeout");
diff --git a/flink_jobs/stream_status/src/main/java/ops/ConfigManager.java b/flink_jobs/stream_status/src/main/java/ops/ConfigManager.java
index 1853ecb3..511529fe 100644
--- a/flink_jobs/stream_status/src/main/java/ops/ConfigManager.java
+++ b/flink_jobs/stream_status/src/main/java/ops/ConfigManager.java
@@ -5,60 +5,57 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
-
-import java.util.Map.Entry;
+import java.util.List;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-
-
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
+
public class ConfigManager {
private static final Logger LOG = Logger.getLogger(ConfigManager.class.getName());
public String id; // report uuid reference
- public String tenant;
public String report;
+ public String tenant;
public String egroup; // endpoint group
public String ggroup; // group of groups
- public String agroup; // alternative group
public String weight; // weight factor type
public TreeMap egroupTags;
public TreeMap ggroupTags;
public TreeMap mdataTags;
-
public ConfigManager() {
- this.tenant = null;
this.report = null;
this.id = null;
+ this.tenant = null;
this.egroup = null;
this.ggroup = null;
this.weight = null;
this.egroupTags = new TreeMap();
this.ggroupTags = new TreeMap();
this.mdataTags = new TreeMap();
-
+
}
public void clear() {
- this.id=null;
- this.tenant = null;
+ this.id = null;
this.report = null;
+ this.tenant = null;
this.egroup = null;
this.ggroup = null;
this.weight = null;
this.egroupTags.clear();
this.ggroupTags.clear();
this.mdataTags.clear();
-
+
}
public String getReportID() {
@@ -73,12 +70,11 @@ public String getTenant() {
return tenant;
}
+
public String getEgroup() {
return egroup;
}
-
-
public void loadJson(File jsonFile) throws IOException {
// Clear data
this.clear();
@@ -91,30 +87,35 @@ public void loadJson(File jsonFile) throws IOException {
JsonElement jElement = jsonParser.parse(br);
JsonObject jObj = jElement.getAsJsonObject();
// Get the simple fields
- this.id = jObj.getAsJsonPrimitive("id").getAsString();
- this.tenant = jObj.getAsJsonPrimitive("tenant").getAsString();
- this.report = jObj.getAsJsonPrimitive("job").getAsString();
- this.egroup = jObj.getAsJsonPrimitive("egroup").getAsString();
- this.ggroup = jObj.getAsJsonPrimitive("ggroup").getAsString();
- this.weight = jObj.getAsJsonPrimitive("weight").getAsString();
- this.agroup = jObj.getAsJsonPrimitive("altg").getAsString();
+ this.id = jObj.get("id").getAsString();
+ this.tenant = jObj.get("tenant").getAsString();
+ this.report = jObj.get("info").getAsJsonObject().get("name").getAsString();
+
+ // get topology schema names
+ JsonObject topoGroup = jObj.get("topology_schema").getAsJsonObject().getAsJsonObject("group");
+ this.ggroup = topoGroup.get("type").getAsString();
+ this.egroup = topoGroup.get("group").getAsJsonObject().get("type").getAsString();
+
+ this.weight = jObj.get("weight").getAsString();
// Get compound fields
- JsonObject jEgroupTags = jObj.getAsJsonObject("egroup_tags");
- JsonObject jGgroupTags = jObj.getAsJsonObject("ggroup_tags");
- JsonObject jMdataTags = jObj.getAsJsonObject("mdata_tags");
- JsonObject jDataMap = jObj.getAsJsonObject("datastore_maps");
- // Iterate fields
- for (Entry item : jEgroupTags.entrySet()) {
-
- this.egroupTags.put(item.getKey(), item.getValue().getAsString());
- }
- for (Entry item : jGgroupTags.entrySet()) {
-
- this.ggroupTags.put(item.getKey(), item.getValue().getAsString());
- }
- for (Entry item : jMdataTags.entrySet()) {
-
- this.mdataTags.put(item.getKey(), item.getValue().getAsString());
+ JsonArray jTags = jObj.getAsJsonArray("filter_tags");
+
+ // Iterate tags
+ if (jTags != null) {
+ for (JsonElement tag : jTags) {
+ JsonObject jTag = tag.getAsJsonObject();
+ String name = jTag.get("name").getAsString();
+ String value = jTag.get("value").getAsString();
+ String ctx = jTag.get("context").getAsString();
+ if (ctx.equalsIgnoreCase("group_of_groups")){
+ this.ggroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("endpoint_groups")){
+ this.egroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("metric_data")) {
+ this.mdataTags.put(name, value);
+ }
+
+ }
}
@@ -132,4 +133,56 @@ public void loadJson(File jsonFile) throws IOException {
}
+
+ /**
+ * Loads Report config information from a config json string
+ *
+ */
+ public void loadJsonString(List confJson) throws JsonParseException {
+ // Clear data
+ this.clear();
+
+ try {
+
+ JsonParser jsonParser = new JsonParser();
+ // Grab the first - and only line of json from ops data
+ JsonElement jElement = jsonParser.parse(confJson.get(0));
+ JsonObject jObj = jElement.getAsJsonObject();
+ // Get the simple fields
+ this.id = jObj.get("id").getAsString();
+ this.tenant = jObj.get("tenant").getAsString();
+ this.report = jObj.get("info").getAsJsonObject().get("name").getAsString();
+ // get topology schema names
+ JsonObject topoGroup = jObj.get("topology_schema").getAsJsonObject().getAsJsonObject("group");
+ this.ggroup = topoGroup.get("type").getAsString();
+ this.egroup = topoGroup.get("group").getAsJsonObject().get("type").getAsString();
+ this.weight = jObj.get("weight").getAsString();
+ // Get compound fields
+ JsonArray jTags = jObj.getAsJsonArray("tags");
+
+ // Iterate tags
+ if (jTags != null) {
+ for (JsonElement tag : jTags) {
+ JsonObject jTag = tag.getAsJsonObject();
+ String name = jTag.get("name").getAsString();
+ String value = jTag.get("value").getAsString();
+ String ctx = jTag.get("context").getAsString();
+ if (ctx.equalsIgnoreCase("group_of_groups")){
+ this.ggroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("endpoint_groups")){
+ this.egroupTags.put(name, value);
+ } else if (ctx.equalsIgnoreCase("metric_data")) {
+ this.mdataTags.put(name, value);
+ }
+
+ }
+ }
+
+ } catch (JsonParseException ex) {
+ LOG.error("Not valid json contents");
+ throw ex;
+ }
+
+ }
+
}
diff --git a/flink_jobs/stream_status/src/main/java/status/StatusManager.java b/flink_jobs/stream_status/src/main/java/status/StatusManager.java
index a24992d8..a8fc5559 100644
--- a/flink_jobs/stream_status/src/main/java/status/StatusManager.java
+++ b/flink_jobs/stream_status/src/main/java/status/StatusManager.java
@@ -43,7 +43,7 @@ public class StatusManager {
static Logger LOG = LoggerFactory.getLogger(StatusManager.class);
// Name of the report used
- String report;
+ private String report;
// Sync file structures necessary for status computation
public EndpointGroupManagerV2 egp = new EndpointGroupManagerV2();
@@ -70,6 +70,13 @@ public class StatusManager {
// trigger
String tsLatest;
+ public void setReport(String report) {
+ this.report = report;
+ }
+
+ public String getReport() {
+ return this.report;
+ }
public void setTimeout(Long timeout) {
this.timeout = timeout;
@@ -760,8 +767,9 @@ private String genEvent(String type, String group, String service, String hostna
toZulu(ts), tsProc, prevStatus, toZulu(prevTs), new Boolean(repeat).toString(), summary, message );
Gson gson = new Gson();
- LOG.debug("Event Generated: " + gson.toJson(evnt));
- return gson.toJson(evnt);
+ String evntJson = gson.toJson(evnt);
+ LOG.debug("Event Generated: " + evntJson);
+ return evntJson;
}
diff --git a/flink_jobs/stream_status/src/main/java/sync/AggregationProfileManager.java b/flink_jobs/stream_status/src/main/java/sync/AggregationProfileManager.java
index b43353dc..15a143c4 100644
--- a/flink_jobs/stream_status/src/main/java/sync/AggregationProfileManager.java
+++ b/flink_jobs/stream_status/src/main/java/sync/AggregationProfileManager.java
@@ -16,11 +16,14 @@
import java.util.List;
import java.util.Map.Entry;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
+
+
public class AggregationProfileManager {
private HashMap list;
@@ -235,32 +238,36 @@ public void loadJson(File jsonFile) throws IOException {
JsonElement jRootElement = jsonParser.parse(br);
JsonObject jRootObj = jRootElement.getAsJsonObject();
- JsonObject apGroups = jRootObj.getAsJsonObject("groups");
-
// Create new entry for this availability profile
AvProfileItem tmpAvp = new AvProfileItem();
+ JsonArray apGroups = jRootObj.getAsJsonArray("groups");
+
tmpAvp.name = jRootObj.get("name").getAsString();
tmpAvp.namespace = jRootObj.get("namespace").getAsString();
- tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsString();
- tmpAvp.metricOp = jRootObj.get("metric_ops").getAsString();
- tmpAvp.groupType = jRootObj.get("group_type").getAsString();
- tmpAvp.op = jRootObj.get("operation").getAsString();
+ tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsJsonObject().get("name").getAsString();
+ tmpAvp.metricOp = jRootObj.get("metric_operation").getAsString();
+ tmpAvp.groupType = jRootObj.get("endpoint_group").getAsString();
+ tmpAvp.op = jRootObj.get("profile_operation").getAsString();
- for (Entry item : apGroups.entrySet()) {
+ for ( JsonElement item : apGroups) {
// service name
- String itemName = item.getKey();
- JsonObject itemObj = item.getValue().getAsJsonObject();
+ JsonObject itemObj = item.getAsJsonObject();
+ String itemName = itemObj.get("name").getAsString();
String itemOp = itemObj.get("operation").getAsString();
- JsonObject itemServices = itemObj.get("services").getAsJsonObject();
+ JsonArray itemServices = itemObj.get("services").getAsJsonArray();
tmpAvp.insertGroup(itemName, itemOp);
- for (Entry subItem : itemServices.entrySet()) {
- tmpAvp.insertService(itemName, subItem.getKey(), subItem.getValue().getAsString());
+ for (JsonElement subItem : itemServices) {
+ JsonObject subObj = subItem.getAsJsonObject();
+ String serviceName = subObj.get("name").getAsString();
+ String serviceOp = subObj.get("operation").getAsString();
+ tmpAvp.insertService(itemName, serviceName,serviceOp);
}
}
+
// Add profile to the list
this.list.put(tmpAvp.name, tmpAvp);
@@ -289,32 +296,36 @@ public void loadJsonString(String apsJson) throws IOException {
JsonElement jRootElement = jsonParser.parse(apsJson);
JsonObject jRootObj = jRootElement.getAsJsonObject();
- JsonObject apGroups = jRootObj.getAsJsonObject("groups");
-
// Create new entry for this availability profile
AvProfileItem tmpAvp = new AvProfileItem();
+ JsonArray apGroups = jRootObj.getAsJsonArray("groups");
+
tmpAvp.name = jRootObj.get("name").getAsString();
tmpAvp.namespace = jRootObj.get("namespace").getAsString();
- tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsString();
- tmpAvp.metricOp = jRootObj.get("metric_ops").getAsString();
- tmpAvp.groupType = jRootObj.get("group_type").getAsString();
- tmpAvp.op = jRootObj.get("operation").getAsString();
+ tmpAvp.metricProfile = jRootObj.get("metric_profile").getAsJsonObject().get("name").getAsString();
+ tmpAvp.metricOp = jRootObj.get("metric_operation").getAsString();
+ tmpAvp.groupType = jRootObj.get("endpoint_group").getAsString();
+ tmpAvp.op = jRootObj.get("profile_operation").getAsString();
- for (Entry item : apGroups.entrySet()) {
+ for ( JsonElement item : apGroups) {
// service name
- String itemName = item.getKey();
- JsonObject itemObj = item.getValue().getAsJsonObject();
+ JsonObject itemObj = item.getAsJsonObject();
+ String itemName = itemObj.get("name").getAsString();
String itemOp = itemObj.get("operation").getAsString();
- JsonObject itemServices = itemObj.get("services").getAsJsonObject();
+ JsonArray itemServices = itemObj.get("services").getAsJsonArray();
tmpAvp.insertGroup(itemName, itemOp);
- for (Entry subItem : itemServices.entrySet()) {
- tmpAvp.insertService(itemName, subItem.getKey(), subItem.getValue().getAsString());
+ for (JsonElement subItem : itemServices) {
+ JsonObject subObj = subItem.getAsJsonObject();
+ String serviceName = subObj.get("name").getAsString();
+ String serviceOp = subObj.get("operation").getAsString();
+ tmpAvp.insertService(itemName, serviceName,serviceOp);
}
}
+
// Add profile to the list
this.list.put(tmpAvp.name, tmpAvp);
diff --git a/flink_jobs/stream_status/src/main/resources/ops/ap1.json b/flink_jobs/stream_status/src/main/resources/ops/ap1.json
index 4940b57a..d754320c 100644
--- a/flink_jobs/stream_status/src/main/resources/ops/ap1.json
+++ b/flink_jobs/stream_status/src/main/resources/ops/ap1.json
@@ -1,35 +1,64 @@
{
-
- "name": "ap1",
- "namespace": "test",
- "metric_profile": "ch.cern.sam.ROC_CRITICAL",
- "metric_ops":"AND",
- "group_type": "sites",
- "operation":"AND",
- "groups": {
- "compute": {
- "services":{
- "CREAM-CE":"OR",
- "ARC-CE":"OR",
- "GRAM5":"OR",
- "unicore6.TargetSystemFactory":"OR",
- "QCG.Computing":"OR"
- },
- "operation":"OR"
- },
- "storage": {
- "services":{
- "SRM":"OR",
- "SRMv2":"OR"
- },
- "operation":"OR"
- },
- "information": {
- "services":{
- "Site-BDII":"OR"
- },
- "operation":"OR"
- }
-
- }
-}
\ No newline at end of file
+ "id": "297c368a-524f-4144-9eb6-924fae5f08fa",
+ "name": "ap1",
+ "namespace": "test",
+ "endpoint_group": "sites",
+ "metric_operation": "AND",
+ "profile_operation": "AND",
+ "metric_profile": {
+ "name": "CH.CERN.SAM.ARGO_MON_CRITICAL",
+ "id": "c81fdb7b-d8f8-4ff9-96c5-6a0c336e2b25"
+ },
+ "groups": [
+ {
+ "name": "compute",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "CREAM-CE",
+ "operation": "OR"
+ },
+ {
+ "name": "ARC-CE",
+ "operation": "OR"
+ },
+ {
+ "name": "GRAM5",
+ "operation": "OR"
+ },
+ {
+ "name": "unicore6.TargetSystemFactory",
+ "operation": "OR"
+ },
+ {
+ "name": "QCG.Computing",
+ "operation": "OR"
+ }
+ ]
+ },
+ {
+ "name": "storage",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "SRMv2",
+ "operation": "OR"
+ },
+ {
+ "name": "SRM",
+ "operation": "OR"
+ }
+ ]
+ },
+ {
+ "name": "information",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "Site-BDII",
+ "operation": "OR"
+ }
+ ]
+ }
+ ]
+ }
diff --git a/flink_jobs/stream_status/src/main/resources/ops/ap2.json b/flink_jobs/stream_status/src/main/resources/ops/ap2.json
index e7eb2c7c..fda7868f 100644
--- a/flink_jobs/stream_status/src/main/resources/ops/ap2.json
+++ b/flink_jobs/stream_status/src/main/resources/ops/ap2.json
@@ -1,35 +1,54 @@
{
+ "id": "337c368a-524f-4144-9eb6-924fae5f08fa",
"name": "fedcloud",
"namespace": "egi",
- "metric_profile": "ch.cern.sam.CLOUD-MON",
- "metric_ops":"AND",
- "group_type": "sites",
- "operation":"AND",
- "groups": {
- "accounting": {
- "services":{
- "eu.egi.cloud.accounting":"OR"
- },
- "operation":"OR"
+ "endpoint_group": "sites",
+ "metric_operation": "AND",
+ "profile_operation": "AND",
+ "metric_profile": {
+ "name": "ch.cern.sam.CLOUD-MON",
+ "id": "c88fdb7b-d8f8-4ff9-96c5-6a0c336e2b25"
+ },
+ "groups": [
+ {
+ "name": "accounting",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "eu.egi.cloud.accounting",
+ "operation": "OR"
+ }
+ ]
},
- "information": {
- "services":{
- "eu.egi.cloud.information.bdii":"OR"
- },
- "operation":"OR"
+ {
+ "name": "information",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "eu.egi.cloud.information.bdii",
+ "operation": "OR"
+ }
+ ]
},
- "storage-management": {
- "services":{
- "eu.egi.cloud.storage-management.cdmi":"OR"
- },
- "operation":"OR"
+ {
+ "name": "storage-management",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "eu.egi.cloud.storage-management.cdmi",
+ "operation": "OR"
+ }
+ ]
},
- "vm-management": {
- "services":{
- "eu.egi.cloud.vm-management.occi":"OR"
- },
- "operation":"OR"
+ {
+ "name": "vm-management",
+ "operation": "OR",
+ "services": [
+ {
+ "name": "eu.egi.cloud.vm-management.occi",
+ "operation": "OR"
+ }
+ ]
}
-
- }
+ ]
}
diff --git a/flink_jobs/stream_status/src/main/resources/ops/config.json b/flink_jobs/stream_status/src/main/resources/ops/config.json
index b7a83621..c2c550e5 100644
--- a/flink_jobs/stream_status/src/main/resources/ops/config.json
+++ b/flink_jobs/stream_status/src/main/resources/ops/config.json
@@ -1,24 +1,83 @@
{
- "tenant":"EGI",
- "id":"c800846f-8478-4af8-85d1-a3f12fe4c18f",
- "job":"Critical",
- "egroup":"SITES",
- "ggroup":"NGI",
- "altg":"ops",
- "weight":"hepspec",
- "egroup_tags":{
- "scope":"EGI",
- "production":"1",
- "monitored":"1"
- },
- "ggroup_tags":{
- "scope":"EGI",
- "infrastructure":"Production",
- "certification":"Certified"
- },
- "mdata_tags":{
- "vo":"ops",
- "vo_fqan":"ops",
- "roc":"any"
+ "id": "c800846f-8478-4af8-85d1-a3f12fe4c18f",
+ "info": {
+ "name": "Critical",
+ "description": "EGI report for Roc critical",
+ "created": "2015-10-19 10:35:49",
+ "updated": "2015-10-19 10:35:49"
+ },
+ "tenant": "EGI",
+ "topology_schema": {
+ "group": {
+ "type": "NGI",
+ "group": {
+ "type": "SITES"
+ }
+ }
+ },
+ "weight": "hepspec",
+ "profiles": [
+ {
+ "id": "433beb2c-45cc-49d4-a8e0-b132bb30327e",
+ "name": "ch.cern.sam.ROC_CRITICAL",
+ "type": "metric"
+ },
+ {
+ "id": "17d1462f-8f91-4728-a253-1a6e8e2e848d",
+ "name": "ops1",
+ "type": "operations"
+ },
+ {
+ "id": "1ef8c0c9-f9ef-4ca1-9ee7-bb8b36332036",
+ "name": "critical",
+ "type": "aggregation"
+ }
+ ],
+ "filter_tags": [
+ {
+ "name": "production",
+ "value": "1",
+ "context": "endpoint_groups"
+ },
+ {
+ "name": "monitored",
+ "value": "1",
+ "context": "endpoint_groups"
+ },
+ {
+ "name": "scope",
+ "value": "EGI",
+ "context": "endpoint_groups"
+ },
+ {
+ "name": "scope",
+ "value": "EGI",
+ "context": "group_of_groups"
+ },
+ {
+ "name": "infrastructure",
+ "value": "Production",
+ "context": "group_of_groups"
+ },
+ {
+ "name": "certification",
+ "value": "Certified",
+ "context": "group_of_groups"
+ },
+ {
+ "name": "vo",
+ "value": "ops",
+ "context": "metric_data"
+ },
+ {
+ "name": "vo_fqan",
+ "value": "ops",
+ "context": "metric_data"
+ },
+ {
+ "name": "roc",
+ "value": "any",
+ "context": "metric_data"
+ }
+ ]
}
-}
diff --git a/flink_jobs/stream_status/src/test/java/status/StatusManagerTest.java b/flink_jobs/stream_status/src/test/java/status/StatusManagerTest.java
index b05cd193..cd10e52b 100644
--- a/flink_jobs/stream_status/src/test/java/status/StatusManagerTest.java
+++ b/flink_jobs/stream_status/src/test/java/status/StatusManagerTest.java
@@ -41,6 +41,9 @@ public static void setUpBeforeClass() throws Exception {
@Test
public void test() throws URISyntaxException, IOException, ParseException {
+
+
+
// Prepare Resource File
URL resAPSJsonFile = StatusManagerTest.class.getResource("/ops/ap1.json");
File jsonAPSFile = new File(resAPSJsonFile.toURI());
@@ -58,7 +61,7 @@ public void test() throws URISyntaxException, IOException, ParseException {
File avroDownFile = new File(resDownAvroFile.toURI());
StatusManager sm = new StatusManager();
- sm.report="Critical";
+ sm.setReport("Critical");
sm.loadAllFiles("2017-03-03", avroDownFile, avroEGPFile, avroMPSFile, jsonAPSFile, jsonOPSFile);
Date ts1 = sm.fromZulu("2017-03-03T00:00:00Z");