Skip to content

Commit

Permalink
Added DASClient Code
Browse files Browse the repository at this point in the history
  • Loading branch information
madhawav committed Aug 22, 2017
1 parent 464eea4 commit 27addde
Show file tree
Hide file tree
Showing 32 changed files with 1,892 additions and 7 deletions.
28 changes: 28 additions & 0 deletions PySiddhi4/das/DASClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from PySiddhi4.das.EventSimulator.EventSimulatorClient import EventSimulatorClient
from PySiddhi4.das.SiddhiAppManagement.SiddhiAppManagementClient import SiddhiAppManagementClient


class DASClient(object):
'''
REST Client to work with WSO2 Data Analytics Server 4.0
'''
def __init__(self, host_url):
'''
Instantiate REST Client
:param host_url: host url of DAS4. E.g. http://localhost:9090
'''
self.host_url = host_url

def getSiddhiAppManagementClient(self):
'''
Obtain client for managing Siddhi Apps
:return:
'''
return SiddhiAppManagementClient(self.host_url + "/siddhi-apps")

def getEventSimulatorClient(self):
'''
Obtain client on event simulator
:return:
'''
return EventSimulatorClient(self.host_url + "/simulation")
90 changes: 90 additions & 0 deletions PySiddhi4/das/EventSimulator/AttributeConfiguration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from enum import Enum

from PySiddhi4.das.ObjectMapping.APIObject import APIObject, NotSet
from PySiddhi4.das.ObjectMapping.FieldMapping import FieldMapping, ListFieldMapping
from PySiddhi4.das.__Util import decodeField, decodeObject


class AttributeConfiguration(APIObject):
'''
Attribute Configuration API Object, which is an attribute of SimulationSource
'''

class Type(Enum):
'''
Type of Attribute Configuration
'''
CUSTOM_DATA_BASED = "CUSTOM_DATA_BASED"
PRIMITIVE_BASED = "PRIMITIVE_BASED"
REGEX_BASED = "REGEX_BASED"
PROPERTY_BASED = "PROPERTY_BASED"

@classmethod
def encode(cls, v):
return v.value

@classmethod
def decode(cls, v):
return AttributeConfiguration.Type(v)

class PrimitiveType(Enum):
'''
Type of primitive data type involved
'''
LONG = "LONG"
INT = "INT"
STRING = "STRING"
FLOAT = "FLOAT"
DOUBLE = "DOUBLE"

@classmethod
def encode(cls, v):
return v.value

@classmethod
def decode(cls, v):
return AttributeConfiguration.Type(v)

def __init__(self, type, min=NotSet(), max=NotSet(), length=NotSet(), precision=NotSet(), list=NotSet(),
pattern=NotSet(),
primitiveType=NotSet(), property=NotSet()):
'''
Instantiates Attribute Configuration API Object
:param type: Type of AttributeConfiguration
:param min:
:param max:
:param length:
:param precision:
:param list:
:param pattern:
:param primitiveType: PrimitiveType involved
:param property:
'''
self._setup(
field_mapping={"type": FieldMapping(AttributeConfiguration.Type.decode, AttributeConfiguration.Type.encode),
"length": FieldMapping(int),
"min": FieldMapping(int), "max": FieldMapping(int),
"precision": FieldMapping(int), "list": ListFieldMapping(str, str, NotSet()),
"pattern": FieldMapping(str), "property": FieldMapping(str),
"primitiveType": FieldMapping(AttributeConfiguration.PrimitiveType.decode,
AttributeConfiguration.PrimitiveType.encode)})
self.type = type
self.length = length
self.min = min
self.max = max
self.precision = precision
self.list = list
self.pattern = pattern
self.primitiveType = primitiveType
self.property = property

@classmethod
def parse(cls, jsonObject):
'''
Converts a Python Class Object (from JSON) to AttributeConfiguration
:param jsonObject:
:return:
'''
result = AttributeConfiguration(type=decodeField(jsonObject["type"], str))
result._parse(jsonObject)
return result
219 changes: 219 additions & 0 deletions PySiddhi4/das/EventSimulator/EventSimulatorClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import json
import logging

from PySiddhi4.das.__Communication.RestClient import RestClient
from PySiddhi4.das.EventSimulator.FeedSimulationConfiguration import FeedSimulationConfiguration


class EventSimulatorClient(RestClient):
'''
Client used to access DAS Event Simulator End Points
'''

def __init__(self, event_simulator_url):
'''
Instantiates EventSimulatorClient
:param event_simulator_url: url to event_simulator endpoint (e.g. root_url + '/simulation')
'''
RestClient.__init__(self, event_simulator_url)

def saveSimulationFeedConfiguration(self, simulationConfiguration):
'''
Saves a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationConfiguration:
:return:
'''
r = self._sendPostRequest("/feed", data=json.dumps(simulationConfiguration.toJSONObject()))
if r.status_code == 201:
return True
elif r.status_code == 409:
raise Exception("EventSimulationConfiguration with same name already exists.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def runSimulationFeedConfiguration(self, simulationConfiguration):
'''
Runs a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationConfiguration:
:return:
'''
r = self._sendPostRequest("/feed/" + simulationConfiguration.properties.simulationName + "/?action=run",
data=json.dumps(simulationConfiguration.toJSONObject()))
if r.status_code == 200:
return True
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with given name does not exist.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def pauseSimulationFeedConfiguration(self, simulationName):
'''
Pauses a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationName:
:return:
'''
r = self._sendPostRequest("/feed/" + simulationName + "/?action=pause")
if r.status_code == 200:
return True
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with given name does not exist.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def resumeSimulationFeedConfiguration(self, simulationName):
'''
Resumes a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationName:
:return:
'''
r = self._sendPostRequest("/feed/" + simulationName + "/?action=resume")
if r.status_code == 200:
return True
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with given name does not exist.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def stopSimulationFeedConfiguration(self, simulationName):
'''
Stops a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationName:
:return:
'''
r = self._sendPostRequest("/feed/" + simulationName + "/?action=stop")
if r.status_code == 200:
return True
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with given name does not exist.")
elif r.status_code == 409:
raise Exception("EventSimulation is already stopped.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def editSimulationFeedConfiguration(self, simulationName, simulationConfiguration):
'''
Edits a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationName:
:param simulationConfiguration: new simulationNameConfiguration
:return:
'''
r = self._sendPutRequest("/feed/" + simulationName, data=json.dumps(simulationConfiguration.toJSONObject()))
if r.status_code == 200:
return True
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with specified name does not exist.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def deleteSimulationFeedConfiguration(self, simulationName):
'''
Deletes a SimulationFeedConfiguration in WSO2 DAS Event Simulator
:param simulationName:
:return:
'''
r = self._sendDeleteRequest("/feed/" + simulationName)
if r.status_code == 200:
return True
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with specified name does not exist.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def retrieveSimulationFeedConfiguration(self, simulationName):
'''
Retrieves a SimulationFeedConfiguration from WSO2 DAS Event Simulator
:param simulationName:
:return:
'''
r = self._sendGetRequest("/feed/" + simulationName)
if r.status_code == 200:
result = r.json()
if result["status"].lower() == "ok":
jsonObject = json.loads(result["message"])["Simulation configuration"]
return FeedSimulationConfiguration.parse(jsonObject)
else:
raise Exception("Respose says not ok")
elif r.status_code == 404:
raise Exception("EventSimulationConfiguration with specified name does not exist.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def simulateSingleEvent(self, singleSimulationConfiguration):
'''
Invokes a Single Simulation in WSO2 DAS Event Simulator
:param singleSimulationConfiguration:
:return:
'''
logging.info("Sending: " + json.dumps(singleSimulationConfiguration.toJSONObject()))
r = self._sendPostRequest("/single", data=json.dumps(singleSimulationConfiguration.toJSONObject()))
if r.status_code == 200:
logging.info("Received: " + r.text)
result = r.json()
if result["status"].lower() == "ok":
return True
else:
raise Exception("Respose says not ok")
elif r.status_code == 409:
raise Exception("EventSimulationConfiguration with same name already exists.")
else:
raise Exception(str(r.status_code) + ": " + r.text)

def uploadCSV(self, fileName, stream=None, path=None):
'''
Uploads a CSV to WSO2 DAS Event Simulator. Only one of the parameters stream or path should be given.
:param fileName: fileName of file to be uploaded
:param stream: stream of file to be uploaded
:param path: path of file to be uploaded
:return:
'''
files = {}
if stream is not None:
files = {"file": (fileName, stream)}
else:
files = {"file": (fileName, open(path, "rb"))}
r = self._sendPostRequest("/files", files=files)

logging.info(r)

if r.status_code == 201:
return True
else:
raise Exception(str(r.status_code) + ": " + r.text)

def updateCSV(self, uploadedFileName, newFileName, stream=None, path=None):
'''
Updates a CSV file uploaded to WSO2 DAS Event Simulator. Only one of parameters stream or path should
be provided.
:param uploadedFileName: previous file name
:param newFileName: new file name
:param stream: stream of file to be uploaded
:param path: path of file to be uploaded
:return:
'''
files = {}
if stream is not None:
files = {"file": (newFileName, stream)}
else:
files = {"file": (newFileName, open(path, "rb"))}
r = self._sendPutRequest("/files/" + uploadedFileName, files=files)

logging.info(r)

if r.status_code == 200:
return True
else:
raise Exception(str(r.status_code) + ": " + r.text)

def deleteCSV(self, fileName):
'''
Deletes a CSV file uploaded to WSO2 DAS Event Simulator
:param fileName:
:return:
'''
r = self._sendDeleteRequest("/files/" + fileName)
logging.info(r)

if r.status_code == 200:
return True
else:
raise Exception(str(r.status_code) + ": " + r.text)
38 changes: 38 additions & 0 deletions PySiddhi4/das/EventSimulator/FeedSimulationConfiguration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from PySiddhi4.das.EventSimulator.SimulationProperties import SimulationProperties
from PySiddhi4.das.EventSimulator.SimulationSource import SimulationSource
from PySiddhi4.das.ObjectMapping.APIObject import APIObject
from PySiddhi4.das.ObjectMapping.FieldMapping import FieldMapping, ListFieldMapping


class FeedSimulationConfiguration(APIObject):
'''
FeedSimulationConfiguration API Object which could be passed to WSO2 DAS Event Simulator via EventSimulatorClient.
'''

def __init__(self, simulation_name=None, properties=None):
'''
Instantiates FeedSimulationConfiguration.
:param simulation_name: name of simulation
:param properties: SimulationProperties
'''
self._setup(
field_mapping={"properties": FieldMapping(SimulationProperties.parse, SimulationProperties.toJSONObject),
"sources": ListFieldMapping(SimulationSource.parse, SimulationSource.toJSONObject, [])})
if properties is not None:
self.properties = properties
elif simulation_name is not None:
self.properties = SimulationProperties(simulationName=simulation_name)
else:
self.properties = SimulationProperties()
self.sources = []

@classmethod
def parse(cls, jsonObject):
'''
Converts a Python Class Object (from JSON) to FeedSimulationConfiguration.
:param jsonObject:
:return:
'''
result = FeedSimulationConfiguration(simulation_name=jsonObject["properties"]["simulationName"])
result._parse(jsonObject)
return result
Loading

0 comments on commit 27addde

Please sign in to comment.