From 823c5c2bac29c34647d1a8b34221625e6d5396e5 Mon Sep 17 00:00:00 2001 From: Andrew Date: Fri, 13 Mar 2020 13:08:17 -0400 Subject: [PATCH 01/88] Added functions and documentation to support PM800 Power meters --- .../driver_configuration/modbus-tk-driver.rst | 5 +- .../interfaces/modbus_tk/helpers.py | 80 ++++++++++++++++++- .../interfaces/modbus_tk/maps/__init__.py | 3 + 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/docs/source/core_services/drivers/driver_configuration/modbus-tk-driver.rst b/docs/source/core_services/drivers/driver_configuration/modbus-tk-driver.rst index 84d7c8cfa0..eb67b47658 100644 --- a/docs/source/core_services/drivers/driver_configuration/modbus-tk-driver.rst +++ b/docs/source/core_services/drivers/driver_configuration/modbus-tk-driver.rst @@ -129,8 +129,9 @@ Each row configures a register definition on the device. Default is FALSE. - **Default Value** (Optional) - The point's default value. If it is reverted by an agent, it changes back to this value. If this value is missing, it will revert to the last known value not set by an agent. - - **Transform** (Optional) - Scaling algorithm: scale(multiplier), scale_int(multiplier), mod10k(reverse), - or none. Default is an empty string. + - **Transform** (Optional) - Scaling algorithm: scale(multiplier), scale_int(multiplier), scale_reg(register_name), + scale_reg_power10(register_name), scale_decimal_int_signed(multiplier), mod10k(reverse), + mod10k64(reverse), mod10k48(reveres) or none. Default is an empty string. - **Table** (Optional) - Standard modbus table name defining how information is stored in slave device. There are 4 different tables: - discrete_output_coils: read/write coil numbers 1-9999 diff --git a/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/helpers.py b/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/helpers.py index 705b3b530a..ebbdf58cd6 100644 --- a/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/helpers.py +++ b/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/helpers.py @@ -151,7 +151,7 @@ def parse_transform_arg(func, arg): :return: the correct argument or raise exception if not matched """ parse_arg = arg - if func in (scale, scale_int): + if func in (scale, scale_int, scale_decimal_int_signed): if type(arg) not in (int, long, float): try: parse_arg = int(arg, 10) @@ -185,6 +185,37 @@ def transform_func_helper(multiple_lst): except TypeError: #string return value +def scale_decimal_int_signed(multiplier): + """ + Scales modbus float value that is stored as a decimal number, not using + standard signing rollover, as the PM800 Power Factor Registers. + Inverse_func is applied just before writing the value over modbus. + + :param multiplier: Scale multiplier, eg 0.001 + :return: Returns a function used by the modbus client. + """ + multiplier = parse_transform_arg(scale_decimal_int_signed, multiplier) + + def func(value): + if value < 0: + return multiplier * (0 - (value + (32768))) + else: + return multiplier * value + + def inverse_func(value): + try: + try: + if value < 0: + return (0 - (value / float(multiplier))) - 0xFFFF + else: + return (value / float(multipliers)) + except TypeError: #string + return value + except ZeroDivisionError: + return None + + func.inverse = inverse_func + return func def scale(multiplier): """ @@ -304,3 +335,50 @@ def mod10k_value(value): return low * 10000 + high return mod10k_value + + +def mod10k64(reverse=False): + """ + Converts the PM800 64 bit 10K + + + @todo This works for postive values but not negative. + The reason is that each of the 2 16-bit modbus registers come over + signed so they need to be split out in the modbus conversion. + """ + reverse = parse_transform_arg(mod10k64, reverse) + + def mod10k_value(value): + r4 = (value >> 48) & 0xFFFF + r3 = (value >> 32) & 0xFFFF + r2 = (value >> 16) & 0xFFFF + r1 = value & 0xFFFF + if not reverse: + return (r1 * 10000**3) + (r2 * 10000**2) + (r3 * 10000) + r4 + else: + return (r4 * 10000**3) + (r3 * 10000**2) + (r2 * 10000) + r1 + + return mod10k_value + + +def mod10k48(reverse=False): + """ + Converts the PM800 INT48-M10K register format. + + @todo This works for postive values but not negative. + The reason is that each of the 2 16-bit modbus registers come over + signed so they need to be split out in the modbus conversion. + """ + reverse = parse_transform_arg(mod10k48, reverse) + + def mod10k_value(value): + r4 = (value >> 48) & 0xFFFF + r3 = (value >> 32) & 0xFFFF + r2 = (value >> 16) & 0xFFFF + r1 = value & 0xFFFF + if not reverse: + return (r2 * 10000**2) + (r3 * 10000) + r4 + else: + return (r1 * 10000**2) + (r2 * 10000) + r3 + + return mod10k_value diff --git a/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/maps/__init__.py b/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/maps/__init__.py index b35071894c..b9ea8bece4 100644 --- a/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/maps/__init__.py +++ b/services/core/MasterDriverAgent/master_driver/interfaces/modbus_tk/maps/__init__.py @@ -82,7 +82,10 @@ class MapException(Exception): transform_map = dict( scale=helpers.scale, scale_int=helpers.scale_int, + scale_decimal_int_signed=helpers.scale_decimal_int_signed, mod10k=helpers.mod10k, + mod10k64=helpers.mod10k64, + mod10k48=helpers.mod10k48, scale_reg=helpers.scale_reg, scale_reg_pow_10=helpers.scale_reg_pow_10 ) From d94d536e19de0482dccef33eb8fa74ea0a649453 Mon Sep 17 00:00:00 2001 From: kini136 Date: Mon, 6 Apr 2020 13:55:17 -0700 Subject: [PATCH 02/88] Added runtime_limit option to have listener automatical stop after interval --- examples/ListenerAgent/listener/agent.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/examples/ListenerAgent/listener/agent.py b/examples/ListenerAgent/listener/agent.py index 1ba1443801..5ec5e16645 100644 --- a/examples/ListenerAgent/listener/agent.py +++ b/examples/ListenerAgent/listener/agent.py @@ -40,6 +40,7 @@ import logging import sys from pprint import pformat +import datetime from volttron.platform.agent import utils from volttron.platform.messaging.health import STATUS_GOOD @@ -52,6 +53,7 @@ DEFAULT_MESSAGE = 'Listener Message' DEFAULT_AGENTID = "listener" DEFAULT_HEARTBEAT_PERIOD = 5 +RUNTIME_LIMIT = 600 class ListenerAgent(Agent): @@ -61,11 +63,22 @@ class ListenerAgent(Agent): def __init__(self, config_path, **kwargs): super().__init__(**kwargs) + self.flag = True self.config = utils.load_config(config_path) self._agent_id = self.config.get('agentid', DEFAULT_AGENTID) self._message = self.config.get('message', DEFAULT_MESSAGE) self._heartbeat_period = self.config.get('heartbeat_period', DEFAULT_HEARTBEAT_PERIOD) + try: + self.runtime_limit = int(self.config.get('runtime_limit', '')) + self.stop_time = datetime.datetime.now() + datetime.timedelta(seconds=self.runtime_limit) + _log.debug('Listener agent will stop at {}'.format(self.stop_time)) + except: + _log.debug('Runtime limit is not given') + self.flag = False + + if self.flag: + self.core.schedule(self.stop_time, self.core.stop) try: self._heartbeat_period = int(self._heartbeat_period) except: @@ -84,6 +97,7 @@ def __init__(self, config_path, **kwargs): @Core.receiver('onsetup') def onsetup(self, sender, **kwargs): # Demonstrate accessing a value from the config file + self.start_time = datetime.datetime.now() _log.info(self.config.get('message', DEFAULT_MESSAGE)) self._agent_id = self.config.get('agentid') @@ -98,7 +112,7 @@ def onstart(self, sender, **kwargs): _log.info('query: %r', query.query('serverkey').get()) @PubSub.subscribe('pubsub', '') - def on_match(self, peer, sender, bus, topic, headers, message): + def on_match(self, peer, sender, bus, topic, headers, message): """Use match_all to receive all messages and print them out.""" self._logfn( "Peer: {0}, Sender: {1}:, Bus: {2}, Topic: {3}, Headers: {4}, " From 361a0c49b85bd024ac59d59e9f12a2ec02423516 Mon Sep 17 00:00:00 2001 From: kini136 Date: Mon, 6 Apr 2020 14:04:23 -0700 Subject: [PATCH 03/88] added runtime_limit parameter in the config file --- examples/ListenerAgent/config | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/ListenerAgent/config b/examples/ListenerAgent/config index 2619210fad..585d2a2107 100644 --- a/examples/ListenerAgent/config +++ b/examples/ListenerAgent/config @@ -7,4 +7,6 @@ # verbosity is decreased from left to right above # default: INFO "log-level": "INFO" + # stop time in seconds + "runtime_limit":30 } From 5c336d5f1f8233fa9bf33d1bbc90e2c3ec2ecdbc Mon Sep 17 00:00:00 2001 From: kini136 Date: Mon, 6 Apr 2020 18:19:30 -0700 Subject: [PATCH 04/88] Modification after sanity check --- examples/ListenerAgent/config | 6 ++---- examples/ListenerAgent/listener/agent.py | 2 -- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/examples/ListenerAgent/config b/examples/ListenerAgent/config index 585d2a2107..b0be919f76 100644 --- a/examples/ListenerAgent/config +++ b/examples/ListenerAgent/config @@ -1,12 +1,10 @@ { - - "agentid": "listener1", "message": "hello", + # stop time in seconds + "runtime_limit":30, # log-level can be DEBUG, INFO, WARN or ERROR # verbosity is decreased from left to right above # default: INFO "log-level": "INFO" - # stop time in seconds - "runtime_limit":30 } diff --git a/examples/ListenerAgent/listener/agent.py b/examples/ListenerAgent/listener/agent.py index 5ec5e16645..c20b59a8ee 100644 --- a/examples/ListenerAgent/listener/agent.py +++ b/examples/ListenerAgent/listener/agent.py @@ -53,7 +53,6 @@ DEFAULT_MESSAGE = 'Listener Message' DEFAULT_AGENTID = "listener" DEFAULT_HEARTBEAT_PERIOD = 5 -RUNTIME_LIMIT = 600 class ListenerAgent(Agent): @@ -97,7 +96,6 @@ def __init__(self, config_path, **kwargs): @Core.receiver('onsetup') def onsetup(self, sender, **kwargs): # Demonstrate accessing a value from the config file - self.start_time = datetime.datetime.now() _log.info(self.config.get('message', DEFAULT_MESSAGE)) self._agent_id = self.config.get('agentid') From 0772797e6f120f58fa395e8a004d98388e1c65d1 Mon Sep 17 00:00:00 2001 From: kini136 Date: Mon, 6 Apr 2020 20:06:03 -0700 Subject: [PATCH 05/88] Remove the additional line of code and made local variable. This change also handles negative numbers. --- examples/ListenerAgent/listener/agent.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/examples/ListenerAgent/listener/agent.py b/examples/ListenerAgent/listener/agent.py index c20b59a8ee..8ef516f6ce 100644 --- a/examples/ListenerAgent/listener/agent.py +++ b/examples/ListenerAgent/listener/agent.py @@ -62,22 +62,20 @@ class ListenerAgent(Agent): def __init__(self, config_path, **kwargs): super().__init__(**kwargs) - self.flag = True self.config = utils.load_config(config_path) self._agent_id = self.config.get('agentid', DEFAULT_AGENTID) self._message = self.config.get('message', DEFAULT_MESSAGE) self._heartbeat_period = self.config.get('heartbeat_period', DEFAULT_HEARTBEAT_PERIOD) - try: - self.runtime_limit = int(self.config.get('runtime_limit', '')) - self.stop_time = datetime.datetime.now() + datetime.timedelta(seconds=self.runtime_limit) - _log.debug('Listener agent will stop at {}'.format(self.stop_time)) - except: - _log.debug('Runtime limit is not given') - self.flag = False - if self.flag: - self.core.schedule(self.stop_time, self.core.stop) + runtime_limit = int(self.config.get('runtime_limit', 0)) + if runtime_limit and runtime_limit > 0: + stop_time = datetime.datetime.now() + datetime.timedelta(seconds=runtime_limit) + _log.info('Listener agent will stop at {}'.format(stop_time)) + self.core.schedule(stop_time, self.core.stop) + else: + _log.info('No valid runtime_limit configured; listener agent will run until manually stopped') + try: self._heartbeat_period = int(self._heartbeat_period) except: From 0c1634d0b7707d833ffab05f07ac8ed0a32eeaff Mon Sep 17 00:00:00 2001 From: kini136 Date: Fri, 8 May 2020 10:50:30 -0700 Subject: [PATCH 06/88] modified for restarting playback and wrapping metadata as dict --- examples/DataPublisher/datapublisher/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/DataPublisher/datapublisher/agent.py b/examples/DataPublisher/datapublisher/agent.py index 0efdb81e2d..2a123371a5 100644 --- a/examples/DataPublisher/datapublisher/agent.py +++ b/examples/DataPublisher/datapublisher/agent.py @@ -215,7 +215,7 @@ def build_metadata(name_map, unittype_map): results = defaultdict(dict) for topic, point in name_map.values(): unit_type = Publisher._get_unit(point, unittype_map) - results[topic][point] = unit_type + results[topic][point] = {"unit": unit_type} return results def build_maps(self, fieldnames, base_path): @@ -350,7 +350,7 @@ def publish_loop(self): # Reset data frequency counter. self._next_allowed_publish = None if not isinstance(self._input_data, list): - handle = open(self._input_data, 'rb') + handle = open(self._input_data, 'r') self._data = csv.DictReader(handle) @RPC.export From 866fb319a7a11c7f2f8c5226c65691eddeeaad2b Mon Sep 17 00:00:00 2001 From: sgilbride Date: Fri, 15 May 2020 00:56:45 -0700 Subject: [PATCH 07/88] Added json-api to master web. Known issues: Anyone can access any of the RPC calls that the master web service can access Certain errors are not displayed correctly. Current api format: POST request { "id": , "method": , "params": } --- .../core/VolttronCentral/rpc_test_client.py | 2 +- .../VolttronCentral/volttroncentral/agent.py | 2 - volttron/platform/web/master_web_service.py | 120 ++++++++++++++++-- 3 files changed, 111 insertions(+), 13 deletions(-) diff --git a/services/core/VolttronCentral/rpc_test_client.py b/services/core/VolttronCentral/rpc_test_client.py index 57f4063e90..ae03cfedc3 100644 --- a/services/core/VolttronCentral/rpc_test_client.py +++ b/services/core/VolttronCentral/rpc_test_client.py @@ -13,7 +13,7 @@ def do_rpc(method, params=None ): json_package = { 'jsonrpc': '2.0', 'id': '2503402', - 'method':method, + 'method': method, } if authentication: diff --git a/services/core/VolttronCentral/volttroncentral/agent.py b/services/core/VolttronCentral/volttroncentral/agent.py index a0c25b2811..3df119c190 100644 --- a/services/core/VolttronCentral/volttroncentral/agent.py +++ b/services/core/VolttronCentral/volttroncentral/agent.py @@ -243,8 +243,6 @@ def _configure(self, config_name, action, contents): self._authenticated_sessions = SessionHandler(Authenticate(users)) - self.vip.web.register_endpoint(r'/vc/jsonrpc', self.jsonrpc) - self.vip.web.register_websocket(r'/vc/ws', self.open_authenticate_ws_endpoint, self._ws_closed, diff --git a/volttron/platform/web/master_web_service.py b/volttron/platform/web/master_web_service.py index 700f1f09dd..cd7a2db5d1 100644 --- a/volttron/platform/web/master_web_service.py +++ b/volttron/platform/web/master_web_service.py @@ -50,7 +50,6 @@ from cryptography.hazmat.primitives import serialization from gevent import Greenlet from jinja2 import Environment, FileSystemLoader, select_autoescape -from volttron.platform.agent.web import Response from ws4py.server.geventserver import WSGIServer @@ -58,21 +57,30 @@ from .authenticate_endpoint import AuthenticateEndpoints from .csr_endpoints import CSREndpoints from .webapp import WebApplicationWrapper +from volttron.platform.agent import utils +from volttron.platform.agent.known_identities import CONTROL from ..agent.utils import get_fq_identity from ..agent.web import Response, JsonResponse from ..auth import AuthEntry, AuthFile, AuthFileEntryAlreadyExists from ..certs import Certs, CertWrapper from ..jsonrpc import (json_result, json_validate_request, - UNAUTHORIZED) -from ..vip.agent import Agent, Core, RPC + INVALID_REQUEST, METHOD_NOT_FOUND, + UNHANDLED_EXCEPTION, UNAUTHORIZED, + UNAVAILABLE_PLATFORM, INVALID_PARAMS, + UNAVAILABLE_AGENT, INTERNAL_ERROR) + +from ..vip.agent import Agent, Core, RPC, Unreachable from ..vip.agent.subsystems import query from ..vip.socket import encode_key -from ...platform import jsonapi, get_platform_config +from ...platform import jsonapi, jsonrpc, get_platform_config from ...platform.aip import AIPplatform from ...utils import is_ip_private from ...utils.rmq_config_params import RMQConfig +# must be after importing of utils which imports grequest. +import requests + _log = logging.getLogger(__name__) @@ -208,7 +216,7 @@ def get_serverkey(self): def get_volttron_central_address(self): """Return address of external Volttron Central - Note: this only applies to Volltron Central agents that are + Note: this only applies to Volttron Central agents that are running on a different platform. """ return self.volttron_central_address @@ -621,6 +629,100 @@ def _sendfile(self, env, start_response, filename): return FileWrapper(open(filename, 'rb')) + def _to_jsonrpc_obj(self, jsonrpcstr): + """ Convert data string into a JsonRpcData named tuple. + + :param object data: Either a string or a dictionary representing a json document. + """ + return jsonrpc.JsonRpcData.parse(jsonrpcstr) + + def jsonrpc(self, env, data): + """ The main entry point for ^jsonrpc data + + This method will only accept rpcdata. The first time this method + is called, per session, it must be using get_authorization. That + will return a session token that must be included in every + subsequent request. The session is tied to the ip address + of the caller. + + :param object env: Environment dictionary for the request. + :param object data: The JSON-RPC 2.0 method to call. + :return object: An JSON-RPC 2.0 response. + """ + if env['REQUEST_METHOD'].upper() != 'POST': + return JsonResponse(jsonapi.dumps(jsonrpc.json_error('NA', INVALID_REQUEST, + 'Invalid request method, only POST allowed'))) + + try: + rpcdata = self._to_jsonrpc_obj(data) + _log.info('rpc method: {}'.format(rpcdata.method)) + # Authentication url + # This does not need to be local, however for now we are going to + # make it so assuming only one level of authentication. + # auth_url = "{url_scheme}://{HTTP_HOST}/authenticate".format( + # url_scheme=env['wsgi.url_scheme'], + # HTTP_HOST=env['HTTP_HOST']) + # args = {'username': rpcdata.params['username'], + # 'password': rpcdata.params['password'], + # 'ip': env['REMOTE_ADDR']} + # resp = requests.post(auth_url, json=args, verify=False) + # + # if resp.ok and resp.text: + # from volttron.platform.web import get_bearer, NotAuthorized + # try: + # claims = self.get_user_claims(resp.text) + # except NotAuthorized: + # _log.info('Invalid username/password for {}'.format( + # rpcdata.params['username'])) + # return JsonResponse(jsonrpc.json_error( + # rpcdata.id, UNAUTHORIZED, + # "Invalid username/password specified.")) + + _log.debug('RPC METHOD IS: {}'.format(rpcdata.method)) + if not rpcdata.method: + return JsonResponse(jsonapi.dumps(jsonrpc.json_error( + 'NA', INVALID_REQUEST, 'Invalid rpc data {}'.format(data)))) + else: + if rpcdata.params: + result_or_error = self.vip.rpc(rpcdata.id, rpcdata.method, rpcdata.params).get() + else: + result_or_error = self.vip.rpc(rpcdata.id, rpcdata.method).get() + + except AssertionError: + return JsonResponse(jsonapi.dumps(jsonrpc.json_error( + 'NA', INVALID_REQUEST, 'Invalid rpc data {}'.format(data)))) + except Unreachable: + return JsonResponse(jsonapi.dumps(jsonrpc.json_error( + rpcdata.id, UNAVAILABLE_PLATFORM, + "Couldn't reach platform with method {} params: {}".format( + rpcdata.method, + rpcdata.params)))) + except Exception as e: + + return JsonResponse(jsonapi.dumps(jsonrpc.json_error( + 'NA', UNHANDLED_EXCEPTION, e + ))) + + return JsonResponse(jsonapi.dumps(self._get_jsonrpc_response(rpcdata.id, result_or_error))) + + def _get_jsonrpc_response(self, id, result_or_error): + """ Wrap the response in either a json-rpc error or result. + + :param id: + :param result_or_error: + :return: + """ + if isinstance(result_or_error, dict): + if 'jsonrpc' in result_or_error: + return result_or_error + + if result_or_error is not None and isinstance(result_or_error, dict): + if 'error' in result_or_error: + error = result_or_error['error'] + _log.debug("RPC RESPONSE ERROR: {}".format(error)) + return jsonrpc.json_error(id, error['code'], error['message']) + return jsonrpc.json_result(id, result_or_error) + @Core.receiver('onstart') def startupagent(self, sender, **kwargs): @@ -660,11 +762,9 @@ def startupagent(self, sender, **kwargs): _log.info('Starting web server binding to {}://{}:{}.'.format(parsed.scheme, hostname, port)) # Handle the platform.web routes here. - self.registeredroutes.append((re.compile('^/discovery/$'), 'callable', - self._get_discovery)) - self.registeredroutes.append((re.compile('^/discovery/allow$'), - 'callable', - self._allow)) + self.registeredroutes.append((re.compile('^/discovery/$'), 'callable', self._get_discovery)) + self.registeredroutes.append((re.compile('^/discovery/allow$'), 'callable', self._allow)) + self.registeredroutes.append((re.compile(r'/jsonrpc'), 'callable', self.jsonrpc)) # these routes are only available for rmq based message bus # at present. if self.core.messagebus == 'rmq': From 234193fbe225fda216f27814cf67b72c47a58d4b Mon Sep 17 00:00:00 2001 From: sgilbride Date: Fri, 15 May 2020 18:04:10 -0700 Subject: [PATCH 08/88] Fixed api to properly handle parameters from json --- services/core/ForwardHistorian/config | 2 +- volttron/platform/web/master_web_service.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/core/ForwardHistorian/config b/services/core/ForwardHistorian/config index c4d8f2aa96..9821bc68b9 100644 --- a/services/core/ForwardHistorian/config +++ b/services/core/ForwardHistorian/config @@ -1,4 +1,4 @@ { "destination-vip": "tcp://127.0.0.1:22916", - "destination-serverkey": null + "destination-serverkey": KWjcDJ_SmrgvTCITd1iiJZu3NhQ5u-bsvgMVFE1YNV8 } diff --git a/volttron/platform/web/master_web_service.py b/volttron/platform/web/master_web_service.py index cd7a2db5d1..cf57ce472e 100644 --- a/volttron/platform/web/master_web_service.py +++ b/volttron/platform/web/master_web_service.py @@ -684,7 +684,7 @@ def jsonrpc(self, env, data): 'NA', INVALID_REQUEST, 'Invalid rpc data {}'.format(data)))) else: if rpcdata.params: - result_or_error = self.vip.rpc(rpcdata.id, rpcdata.method, rpcdata.params).get() + result_or_error = self.vip.rpc(rpcdata.id, rpcdata.method, **rpcdata.params).get() else: result_or_error = self.vip.rpc(rpcdata.id, rpcdata.method).get() From 058e7919801013f35178228ad93c5133185c8003 Mon Sep 17 00:00:00 2001 From: sgilbride Date: Fri, 15 May 2020 20:30:37 -0700 Subject: [PATCH 09/88] Added authentication requirement to use the API. --- volttron/platform/web/master_web_service.py | 48 ++++++++++++--------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/volttron/platform/web/master_web_service.py b/volttron/platform/web/master_web_service.py index cf57ce472e..f5d074b8e0 100644 --- a/volttron/platform/web/master_web_service.py +++ b/volttron/platform/web/master_web_service.py @@ -656,27 +656,17 @@ def jsonrpc(self, env, data): try: rpcdata = self._to_jsonrpc_obj(data) _log.info('rpc method: {}'.format(rpcdata.method)) - # Authentication url - # This does not need to be local, however for now we are going to - # make it so assuming only one level of authentication. - # auth_url = "{url_scheme}://{HTTP_HOST}/authenticate".format( - # url_scheme=env['wsgi.url_scheme'], - # HTTP_HOST=env['HTTP_HOST']) - # args = {'username': rpcdata.params['username'], - # 'password': rpcdata.params['password'], - # 'ip': env['REMOTE_ADDR']} - # resp = requests.post(auth_url, json=args, verify=False) - # - # if resp.ok and resp.text: - # from volttron.platform.web import get_bearer, NotAuthorized - # try: - # claims = self.get_user_claims(resp.text) - # except NotAuthorized: - # _log.info('Invalid username/password for {}'.format( - # rpcdata.params['username'])) - # return JsonResponse(jsonrpc.json_error( - # rpcdata.id, UNAUTHORIZED, - # "Invalid username/password specified.")) + + # Authenticate rpc call + if 'authentication' in rpcdata.params: + if self.jsonrpc_verify_and_dispatch(rpcdata.params['authentication']): + del rpcdata.params['authentication'] + else: + return JsonResponse(jsonapi.dumps(jsonrpc.json_error(rpcdata.id, UNAUTHORIZED, + "Invalid username/password specified."))) + else: + return JsonResponse(jsonapi.dumps(jsonrpc.json_error(rpcdata.id, UNAUTHORIZED, + "Authentication parameter missing."))) _log.debug('RPC METHOD IS: {}'.format(rpcdata.method)) if not rpcdata.method: @@ -723,6 +713,22 @@ def _get_jsonrpc_response(self, id, result_or_error): return jsonrpc.json_error(id, error['code'], error['message']) return jsonrpc.json_result(id, result_or_error) + def jsonrpc_verify_and_dispatch(self, authentication): + """ Verify that the user is an admin + + :param authentication: authentication generated by successful authentication + :return: Boolean + """ + from volttron.platform.web import NotAuthorized + try: + claims = self.get_user_claims(authentication) + except NotAuthorized: + _log.error("Unauthorized user attempted to connect to platform.") + return False + return True + + + @Core.receiver('onstart') def startupagent(self, sender, **kwargs): From b0b62dc195b683d42aa066e58206a1ddbae6ec8a Mon Sep 17 00:00:00 2001 From: "David M. Raker" Date: Sat, 23 May 2020 14:10:04 -0400 Subject: [PATCH 10/88] Fixed timezone in metadata to save string to database in place of pytz object. --- volttron/platform/agent/base_historian.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volttron/platform/agent/base_historian.py b/volttron/platform/agent/base_historian.py index 0a7dd52b9e..a58e1dda08 100644 --- a/volttron/platform/agent/base_historian.py +++ b/volttron/platform/agent/base_historian.py @@ -815,7 +815,7 @@ def _capture_log_data(self, peer, sender, bus, topic, headers, message): if tz: meta['tz'] = tz elif my_tz: - meta['tz'] = my_tz + meta['tz'] = my_tz.zone self._event_queue.put({'source': 'log', 'topic': topic + '/' + point, From 4781f0703557c110eae0b4243a78f3018e6e20d4 Mon Sep 17 00:00:00 2001 From: GHYOON Date: Tue, 26 May 2020 10:06:43 +0900 Subject: [PATCH 11/88] modify return value of _get_unit --- examples/DataPublisher/datapublisher/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/DataPublisher/datapublisher/agent.py b/examples/DataPublisher/datapublisher/agent.py index 0efdb81e2d..f0aa192b42 100644 --- a/examples/DataPublisher/datapublisher/agent.py +++ b/examples/DataPublisher/datapublisher/agent.py @@ -247,7 +247,7 @@ def _get_unit(point, unittype_map): for k, v in unittype_map.items(): if re.match(k, point): return v - return 'percent' + return {'type': 'float'} def _publish_point_all(self, topic, data, meta_data, headers): # makesure topic+point gives a true value. From 9f8f712cff6c23534cface087e39e037431e77e7 Mon Sep 17 00:00:00 2001 From: sgilbride Date: Thu, 28 May 2020 20:36:35 -0700 Subject: [PATCH 12/88] Replaced required removed jsonrpc endpoint in vc. --- services/core/VolttronCentral/volttroncentral/agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/core/VolttronCentral/volttroncentral/agent.py b/services/core/VolttronCentral/volttroncentral/agent.py index 3df119c190..a0c25b2811 100644 --- a/services/core/VolttronCentral/volttroncentral/agent.py +++ b/services/core/VolttronCentral/volttroncentral/agent.py @@ -243,6 +243,8 @@ def _configure(self, config_name, action, contents): self._authenticated_sessions = SessionHandler(Authenticate(users)) + self.vip.web.register_endpoint(r'/vc/jsonrpc', self.jsonrpc) + self.vip.web.register_websocket(r'/vc/ws', self.open_authenticate_ws_endpoint, self._ws_closed, From 2899547f9661ad965c589b2bbedae2e1ad99c021 Mon Sep 17 00:00:00 2001 From: Robert Lutes Date: Fri, 29 May 2020 17:12:58 -0700 Subject: [PATCH 13/88] Address issue when devices message is not list (all publish). --- .../core/ForwardHistorian/forwarder/agent.py | 21 ++++++++++++------- volttron/platform/agent/base_historian.py | 21 ++++++++++++------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/services/core/ForwardHistorian/forwarder/agent.py b/services/core/ForwardHistorian/forwarder/agent.py index 709996ad2b..ab2495fd65 100644 --- a/services/core/ForwardHistorian/forwarder/agent.py +++ b/services/core/ForwardHistorian/forwarder/agent.py @@ -219,18 +219,25 @@ def _capture_device_data(self, peer, sender, bus, topic, headers, message): if _filter in device: for point in point_list: # Only points in the point list will be added to the message payload - if point in message[0]: - msg[0][point] = message[0][point] - msg[1][point] = message[1][point] + if isinstance(message, list): + if point in message[0]: + msg[0][point] = message[0][point] + msg[1][point] = message[1][point] + else: + msg = None + if point in device: + msg = message + break + if (isinstance(msg, list) and not msg[0]) or \ + (isinstance(msg, (float, int, str)) and msg is None): + _log.debug("Topic: {} - is not in configured to be forwarded".format(topic)) + return else: msg = message except Exception as e: _log.debug("Error handling device_data_filter. {}".format(e)) msg = message - if not msg[0]: - _log.debug("Topic: {} - is not in configured to be forwarded".format(topic)) - else: - self.capture_data(peer, sender, bus, topic, headers, msg) + self.capture_data(peer, sender, bus, topic, headers, msg) def _capture_log_data(self, peer, sender, bus, topic, headers, message): self.capture_data(peer, sender, bus, topic, headers, message) diff --git a/volttron/platform/agent/base_historian.py b/volttron/platform/agent/base_historian.py index 5cba67807f..f75589ea72 100644 --- a/volttron/platform/agent/base_historian.py +++ b/volttron/platform/agent/base_historian.py @@ -853,18 +853,25 @@ def _capture_device_data(self, peer, sender, bus, topic, headers, if _filter in device: for point in point_list: # Only points in the point list will be added to the message payload - if point in message[0]: - msg[0][point] = message[0][point] - msg[1][point] = message[1][point] + if isinstance(message, list): + if point in message[0]: + msg[0][point] = message[0][point] + msg[1][point] = message[1][point] + else: + msg = None + if point in device: + msg = message + break + if (isinstance(msg, list) and not msg[0]) or \ + (isinstance(msg, (float, int, str)) and msg is None): + _log.debug("Topic: {} - is not in configured to be forwarded".format(topic)) + return else: msg = message except Exception as e: _log.debug("Error handling device_data_filter. {}".format(e)) msg = message - if not msg[0]: - _log.debug("Topic: {} - is not in configured to be stored in db".format(topic)) - else: - self._capture_data(peer, sender, bus, topic, headers, msg, device) + self._capture_data(peer, sender, bus, topic, headers, msg) def _capture_analysis_data(self, peer, sender, bus, topic, headers, message): From 02eedc3c17daf68d25516abfcebaef02e13a929a Mon Sep 17 00:00:00 2001 From: Robert Lutes Date: Fri, 29 May 2020 17:17:41 -0700 Subject: [PATCH 14/88] Add comments. --- services/core/ForwardHistorian/forwarder/agent.py | 5 ++++- volttron/platform/agent/base_historian.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/services/core/ForwardHistorian/forwarder/agent.py b/services/core/ForwardHistorian/forwarder/agent.py index ab2495fd65..1faebf22f5 100644 --- a/services/core/ForwardHistorian/forwarder/agent.py +++ b/services/core/ForwardHistorian/forwarder/agent.py @@ -218,15 +218,18 @@ def _capture_device_data(self, peer, sender, bus, topic, headers, message): # will be kept. if _filter in device: for point in point_list: - # Only points in the point list will be added to the message payload + # devices all publish if isinstance(message, list): + # Only points in the point list will be added to the message payload if point in message[0]: msg[0][point] = message[0][point] msg[1][point] = message[1][point] else: + # other devices publish (devices/campus/building/device/point) msg = None if point in device: msg = message + # if the point in in the parsed topic then exit for loop break if (isinstance(msg, list) and not msg[0]) or \ (isinstance(msg, (float, int, str)) and msg is None): diff --git a/volttron/platform/agent/base_historian.py b/volttron/platform/agent/base_historian.py index f75589ea72..3c90956e81 100644 --- a/volttron/platform/agent/base_historian.py +++ b/volttron/platform/agent/base_historian.py @@ -852,15 +852,18 @@ def _capture_device_data(self, peer, sender, bus, topic, headers, # will be kept. if _filter in device: for point in point_list: - # Only points in the point list will be added to the message payload + # devices all publish if isinstance(message, list): + # Only points in the point list will be added to the message payload if point in message[0]: msg[0][point] = message[0][point] msg[1][point] = message[1][point] else: + # other devices publish (devices/campus/building/device/point) msg = None if point in device: msg = message + # if the point in in the parsed topic then exit for loop break if (isinstance(msg, list) and not msg[0]) or \ (isinstance(msg, (float, int, str)) and msg is None): From 66b67e2cfba2c16ee66779e804abc1592c697019 Mon Sep 17 00:00:00 2001 From: Robert Lutes Date: Fri, 29 May 2020 18:23:19 -0700 Subject: [PATCH 15/88] Add device to call to _capture_data. --- volttron/platform/agent/base_historian.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volttron/platform/agent/base_historian.py b/volttron/platform/agent/base_historian.py index 3c90956e81..5c13785022 100644 --- a/volttron/platform/agent/base_historian.py +++ b/volttron/platform/agent/base_historian.py @@ -874,7 +874,7 @@ def _capture_device_data(self, peer, sender, bus, topic, headers, except Exception as e: _log.debug("Error handling device_data_filter. {}".format(e)) msg = message - self._capture_data(peer, sender, bus, topic, headers, msg) + self._capture_data(peer, sender, bus, topic, headers, msg, device) def _capture_analysis_data(self, peer, sender, bus, topic, headers, message): From 0b349de4a2b13a8522f5aac44738484c2796e574 Mon Sep 17 00:00:00 2001 From: Robert Lutes Date: Fri, 29 May 2020 18:27:02 -0700 Subject: [PATCH 16/88] Update comment. --- volttron/platform/agent/base_historian.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volttron/platform/agent/base_historian.py b/volttron/platform/agent/base_historian.py index 5c13785022..6595809d97 100644 --- a/volttron/platform/agent/base_historian.py +++ b/volttron/platform/agent/base_historian.py @@ -867,7 +867,7 @@ def _capture_device_data(self, peer, sender, bus, topic, headers, break if (isinstance(msg, list) and not msg[0]) or \ (isinstance(msg, (float, int, str)) and msg is None): - _log.debug("Topic: {} - is not in configured to be forwarded".format(topic)) + _log.debug("Topic: {} - is not in configured to be stored".format(topic)) return else: msg = message From 694383ee05139068146f021d19f80143f98011f6 Mon Sep 17 00:00:00 2001 From: jklarson Date: Mon, 1 Jun 2020 12:18:07 -0700 Subject: [PATCH 17/88] First pass at requested changes for Ecobee --- .../master_driver/interfaces/ecobee.py | 764 ++++++++---------- 1 file changed, 346 insertions(+), 418 deletions(-) diff --git a/services/core/MasterDriverAgent/master_driver/interfaces/ecobee.py b/services/core/MasterDriverAgent/master_driver/interfaces/ecobee.py index 5488557133..7e5587fdc9 100644 --- a/services/core/MasterDriverAgent/master_driver/interfaces/ecobee.py +++ b/services/core/MasterDriverAgent/master_driver/interfaces/ecobee.py @@ -47,15 +47,12 @@ from volttron.platform.jsonrpc import RemoteError from volttron.platform.agent.known_identities import CONFIGURATION_STORE, PLATFORM_DRIVER +AUTH_CONFIG_PATH = "drivers/auth/ecobee_{}" +THERMOSTAT_URL = 'https://api.ecobee.com/1/thermostat' + _log = logging.getLogger(__name__) __version__ = "1.0" -THERMOSTAT_URL = 'https://api.ecobee.com/1/thermostat' -THERMOSTAT_HEADERS = { - 'Content-Type': 'application/json;charset=UTF-8', - 'Authorization': 'Bearer {}' -} - class Interface(BasicRevert, BaseInterface): """ @@ -65,180 +62,73 @@ class Interface(BasicRevert, BaseInterface): def __init__(self, **kwargs): super(Interface, self).__init__(**kwargs) # Configuration value defaults - self.config_dict = None - self.cache_identity = None - self.ecobee_id = None - self.group_id = None - self.api_key = None + self.config_dict = {} + self.api_key = "" + self.ecobee_id = -1 + self.group_id = "" + # Authorization tokens self.refresh_token = None self.access_token = None self.authorization_code = None - self.pin = None - self.authenticated = False - # Config name for updating config during auth update - self.config_name = None + # Config path for storing Ecobee auth information in config store, not user facing + self.auth_config_path = "" # Un-initialized data response from Driver Cache agent self.ecobee_data = None - # Ecobee registers are of non-standard datatypes, so override existing register type dictionary + # Ecobee registers are of non-standard data types, so override existing register type dictionary self.registers = { ('hold', False): [], ('hold', True): [], ('setting', False): [], ('setting', True): [], - ('status', True): [], + # ('status', True): [], ('vacation', False): [], ('programs', False): [] } + # Un-initialized greenlet for querying cache agent + self.authorization_stage = "UNAUTHORIZED" self.poll_greenlet = None def configure(self, config_dict, registry_config_str): - """ - Configure agent with tokens and polling parameters - :param config_dict: Device configuration dictionary from config store - :param registry_config_str: registry configuration string from config store - """ - # populate class values from configuration store - _log.debug("Starting Ecobee driver configuration.") - self.config_dict = config_dict - self.cache_identity = config_dict.get("CACHE_IDENTITY") - if not self.cache_identity: - raise ValueError( - "Ecobee configuration requires identity of Driver HTTP Cache Agent installed on the platform.") - self.api_key = config_dict.get('API_KEY') - self.refresh_token = config_dict.get('REFRESH_TOKEN') - self.access_token = config_dict.get('ACCESS_TOKEN') - self.authorization_code = config_dict.get('AUTHORIZATION_CODE') - self.pin = config_dict.get("PIN") - self.ecobee_id = config_dict.get('DEVICE_ID') - self.group_id = config_dict.get("GROUP_ID", "default") - self.config_name = config_dict.get("config_name") - if not isinstance(self.config_name, str): - raise ValueError("Ecobee driver requires config_name string in driver configuration for authentication " - "updates") - if not isinstance(self.ecobee_id, int): + self.config_dict.update(config_dict) + ecobee_id = self.config_dict.get('DEVICE_ID') + if not isinstance(ecobee_id, int): try: - self.ecobee_id = int(self.ecobee_id) + self.ecobee_id = int(ecobee_id) except ValueError: raise ValueError( "Ecobee driver requires Ecobee device identifier as int, got: {}".format(self.ecobee_id)) - # Update auth tokens as necessary using Ecobee API endpoints - if self.authorization_code is not None: - _log.info("Ecobee using existing Ecobee authorization code.") - self.authenticated = True - try: - self.refresh_tokens() - except Exception as rf: - _log.debug("Failed to refresh tokens with existing auth key: {}. refreshing auth code and trying again" - ".".format(rf)) - self.request_pin() - else: - _log.warning("Ecobee failed to authenicate, refreshing tokens...") - self.authenticated = False - self.access_token = '' - self.authorization_code = '' - self.refresh_token = '' - self.request_pin() - # then parse the driver's registry configuration + self.group_id = self.config_dict.get("GROUP_ID", "default") + self.auth_config_path = AUTH_CONFIG_PATH.format(self.group_id) self.parse_config(registry_config_str) - # Spawn a periodic greenlet to make sure we're always updated with the most recent Ecobee API data + + # Fetch any stored configuration values to reuse + stored_auth_config = self.get_auth_config_from_store() + # Do some minimal checks on auth + if stored_auth_config: + if stored_auth_config.get("AUTH_CODE"): + self.authorization_code = stored_auth_config.get("AUTH_CODE") + self.authorization_stage = "REQUEST_TOKENS" + if stored_auth_config.get("ACCESS_TOKEN") and stored_auth_config.get("REFRESH_TOKEN"): + self.access_token = stored_auth_config.get("ACCESS_TOKEN") + self.refresh_token = stored_auth_config.get("REFRESH_TOKEN") + self.get_ecobee_data() + self.authorization_stage = "AUTHORIZED" + if self.authorization_stage != "AUTHORIZED": + if self.authorization_stage == "REQUEST_TOKENS": + try: + self.update_authorization() + except (ConnectionError, NewConnectionError) as ce: + _log.error("Error requesting tokens during configuration: {}. Resetting Ecobe authorization".format( + ce)) + self.authorization_stage = "UNAUTHORIZED" + if self.authorization_stage == "UNAUTHORIZED": + self.update_authorization() + if not self.poll_greenlet: self.poll_greenlet = self.core.periodic(180, self.get_ecobee_data) _log.debug("Ecobee configuration complete.") - def refresh_tokens(self): - """ - Refresh Ecobee API authentication tokens via API endpoint - asks Ecobee to reset tokens then updates config with - new tokens from Ecobee - """ - _log.debug('Refreshing Ecobee auth tokens.') - url = 'https://api.ecobee.com/token' - params = { - 'grant_type': 'refresh_token', - 'refresh_token': self.refresh_token, - 'client_id': self.api_key - } - if not self.pin: - raise ValueError("Ecobee pin required for refreshing tokens.") - # Generate auth request and extract returned value - response = make_ecobee_request("POST", url, data=params) - for token in 'access_token', 'refresh_token': - if token not in response: - raise RuntimeError("Ecobee response did not contain token{}:, response was {}".format(token, response)) - self.access_token = response['access_token'] - _log.debug("Ecobee access token: {}".format(self.access_token)) - self.refresh_token = response['refresh_token'] - _log.debug("Ecobee refresh token: {}".format(self.refresh_token)) - - def request_pin(self): - """ - Request new application pin from Ecobee API, then updates agent configuration via config store - - NOTE: This endpoint is currently broken based on information gathered from stack exchange! - """ - # Generate auth request and return extracted values - let the user know the pin has to be updated manually using - # the Ecobee Web UI - _log.debug("Requesting new Ecobee pin.") - url = 'https://api.ecobee.com/authorize' - params = { - 'response_type': 'ecobeePin', - 'client_id': self.api_key, - 'scope': 'smartWrite' - } - try: - response = make_ecobee_request("GET", url, params=params) - except RuntimeError as re: - _log.error(re) - _log.warning("Error connecting to Ecobee. Possible connectivity outage. Could not request pin.") - return - for auth_item in ['code', 'ecobeePin']: - if auth_item not in response: - raise RuntimeError("Ecobee authorization response was missing required item: {}, response contained {]". - format(auth_item, response)) - self.authorization_code = response.get('code') - self.pin = response.get('ecobeePin') - _log.warning("***********************************************************") - _log.warning( - 'Please authorize your ecobee developer app with PIN code {}.\nGo to ' - 'https://www.ecobee.com/consumerportal /index.html, click My Apps, Add application, Enter Pin and click ' - 'Authorize.'.format(self.pin)) - _log.warning("***********************************************************") - _log.info("New Ecobee authorization code: {}".format(self.authorization_code)) - - # Allow the user some time to add the application pin through the Ecobee Web UI - gevent.sleep(60) - # Now that we have a new pin to use, refresh the auth tokens - self.request_tokens() - - def request_tokens(self): - """ - Request up to date Auth tokens from Ecobee using API key and authorization code - """ - # Generate auth request and extract returned value - _log.debug("Requesting new auth tokens from Ecobee.") - url = 'https://api.ecobee.com/token' - params = { - 'grant_type': 'ecobeePin', - 'code': self.authorization_code, - 'client_id': self.api_key - } - try: - response = make_ecobee_request("POST", url, data=params) - except RuntimeError as re: - _log.error(re) - _log.warning("Error connecting to Ecobee. Possible connectivity outage. Could not request tokens.") - return - self.authenticated = True - for token in ["access_token", "refresh_token"]: - if token not in response: - raise RuntimeError("Request tokens response did not contain {}, cannot connect to remote Ecobee API " - "until tokens have been successfully obtained from remote") - self.access_token = response.get('access_token') - self.refresh_token = response.get('refresh_token') - _log.info("New Ecobee access token: {}".format(self.access_token)) - _log.info("New Ecobee refresh token: {}".format(self.refresh_token)) - self.update_config() - def parse_config(self, config_dict): """ Parse driver registry configuration and create device registers @@ -291,8 +181,8 @@ def parse_config(self, config_dict): # Each Ecobee thermostat has one Status reporting "register", one programs register and one vacation "register # Status is a static point which reports a list of running HVAC systems reporting to the thermostat - status_register = Status(self.ecobee_id, self.access_token) - self.insert_register(status_register) + # status_register = Status(self.ecobee_id, self.access_token) + # self.insert_register(status_register) # Vacation can be used to manage all Vacation programs for the thermostat vacation_register = Vacation(self.ecobee_id, self.access_token) @@ -302,31 +192,128 @@ def parse_config(self, config_dict): program_register = Program(self.ecobee_id, self.access_token) self.insert_register(program_register) - def update_config(self): + def update_authorization(self): + if self.authorization_stage == "UNAUTHORIZED": + self.authorize_application() + if self.authorization_stage == "REQUEST_TOKENS": + self.request_tokens() + if self.authorization_stage == "REFRESH_TOKENS": + self.refresh_tokens() + self.update_auth_config() + + def authorize_application(self): + auth_url = "https://api.ecobee.com/authorize" + params = { + "response_type": "ecobeePin", + "client_id": self.api_key, + "scope": "smartWrite" + } + try: + response = make_ecobee_request("GET", auth_url, params=params) + except RuntimeError as re: + _log.error(re) + _log.warning("Error connecting to Ecobee. Possible connectivity outage. Could not request pin.") + return + for auth_item in ['code', 'ecobeePin']: + if auth_item not in response: + raise RuntimeError("Ecobee authorization response was missing required item: {}, response contained {}". + format(auth_item, response)) + self.authorization_code = response.get('code') + pin = response.get('ecobeePin') + _log.warning("***********************************************************") + _log.warning( + 'Please authorize your ecobee developer app with PIN code {}.\nGo to ' + 'https://www.ecobee.com/consumerportal /index.html, click My Apps, Add application, Enter Pin and click ' + 'Authorize.'.format(pin)) + _log.warning("***********************************************************") + self.authorization_stage = "REQUEST_TOKENS" + gevent.sleep(60) + + def request_tokens(self): + """ + Request up to date Auth tokens from Ecobee using API key and authorization code + """ + # Generate auth request and extract returned value + _log.debug("Requesting new auth tokens from Ecobee.") + url = 'https://api.ecobee.com/token' + params = { + 'grant_type': 'ecobeePin', + 'code': self.authorization_code, + 'client_id': self.api_key + } + response = make_ecobee_request("POST", url, data=params) + for token in ["access_token", "refresh_token"]: + if token not in response: + raise RuntimeError("Request tokens response did not contain {}, cannot connect to remote Ecobee API " + "until tokens have been successfully obtained from remote") + self.access_token = response.get('access_token') + self.refresh_token = response.get('refresh_token') + self.authorization_stage = "AUTHORIZED" + + def refresh_tokens(self): + """ + Refresh Ecobee API authentication tokens via API endpoint - asks Ecobee to reset tokens then updates config with + new tokens from Ecobee + """ + _log.debug('Refreshing Ecobee auth tokens.') + url = 'https://api.ecobee.com/token' + params = { + 'grant_type': 'refresh_token', + 'refresh_token': self.refresh_token, + 'client_id': self.api_key + } + if not self.pin: + raise ValueError("Ecobee pin required for refreshing tokens.") + # Generate auth request and extract returned value + response = make_ecobee_request("POST", url, data=params) + for token in 'access_token', 'refresh_token': + if token not in response: + raise RuntimeError("Ecobee response did not contain token{}:, response was {}".format(token, response)) + self.access_token = response['access_token'] + self.refresh_token = response['refresh_token'] + self.authorization_stage = "AUTHORIZED" + + def update_auth_config(self): """ Update the master driver configuration for this device with new values from auth functions """ - _log.debug("Updating configuration with new Ecobee auth tokens.") - self.config_dict["AUTHORIZATION_CODE"] = self.authorization_code - self.config_dict["ACCESS_TOKEN"] = self.access_token - self.config_dict["REFRESH_TOKEN"] = self.refresh_token - self.config_dict["PIN"] = self.pin - # Fetch existing driver configuration from config store - driver_config = json.loads( - self.vip.rpc.call(CONFIGURATION_STORE, "manage_get", PLATFORM_DRIVER, self.config_name).get()) - # update driver configuration with new values from Ecobee remote - driver_config["driver_config"].update(self.config_dict) - # Config store update RPC call to update device configuration - self.vip.rpc.call(CONFIGURATION_STORE, "set_config", self.config_name, driver_config, trigger_callback=False, - send_update=True).get(timeout=3) - - def get_ecobee_data(self, refresh=False, retry=True): + auth_config = {"AUTH_CODE": self.authorization_code, + "ACCESS_TOKEN": self.access_token, + "REFRESH_TOKEN": self.refresh_token} + _log.debug("Updating Ecobee auth configuration with new tokens.") + self.vip.rpc.call(CONFIGURATION_STORE, "set_config", self.auth_config_path, auth_config, trigger_callback=False, + send_update=False).get(timeout=3) + + def get_auth_config_from_store(self): + return json.loads(self.vip.rpc.call(CONFIGURATION_STORE, "manage_get", PLATFORM_DRIVER, self.auth_config_path)) + + def get_ecobee_data(self): + """ + Request data from cache, updating auth if Refresh tokens are out of date + """ + try: + self.get_ecobee_data_from_cache() + self.authorization_stage = "AUTHORIZED" + except RemoteError as re: + _log.error("Failed to obtain Ecobee data from cache: {}. Refreshing tokens and trying again".format(re)) + try: + self.authorization_stage = "REFRESH_TOKENS" + self.update_authorization() + self.get_ecobee_data_from_cache() + except RemoteError as re: + _log.error(re) + _log.error("Failed to obtain Ecobee data from cache after refreshing tokens, obtaining new " + "authorization code") + # TODO do we update auth and try again, or throw an error + self.authorization_stage = "UNAUTHORIZED" + self.update_authorization() + + def get_ecobee_data_from_cache(self, refresh=False): """ Request most recent Ecobee data from Driver Cache agent - this prevents overwhelming remote API with data requests and or incurring excessive costs :param refresh: If true, the Driver HTTP Cache will skip cached data and try to query the API, may not return data if the remote rejects due to timing or cost constraints - :param retry: If true try fetching data from cache agent again """ # Generate request information to pass along to cache agent headers = json.dumps({ @@ -343,23 +330,13 @@ def get_ecobee_data(self, refresh=False, retry=True): }) # ask the cache for the most recent API data self.ecobee_data = None - try: - data = self.vip.rpc.call( - self.cache_identity, "driver_data_get", "ecobee", self.group_id, THERMOSTAT_URL, headers, - update_frequency=180, params=params, refresh=refresh).get() - if data is None: - raise RuntimeError("No Ecobee data available from Driver HTTP Cache Agent.") - _log.info("Last Ecobee data update occurred: {}".format(data.get("request_timestamp"))) - self.ecobee_data = data.get("request_response") - except RemoteError: - if retry: - _log.warning("Failed to get Ecobee data from Driver HTTP Cache Agent, refreshing tokens and trying " - "again.") - self.refresh_tokens() - self.get_ecobee_data(refresh=refresh, retry=False) - else: - raise RuntimeError("Failed to get Ecobee data from Driver Cache after refreshing tokens. May be " - "experiencing connection issues or Ecobee API may be down.") + data = self.vip.rpc.call( + self.cache_identity, "driver_data_get", "ecobee", self.group_id, THERMOSTAT_URL, headers, + update_frequency=180, params=params, refresh=refresh).get() + if data is None: + raise RuntimeError("No Ecobee data available from Driver HTTP Cache Agent.") + _log.info("Last Ecobee data update occurred: {}".format(data.get("request_timestamp"))) + self.ecobee_data = data.get("request_response") def get_point(self, point_name, **kwargs): """ @@ -369,11 +346,34 @@ def get_point(self, point_name, **kwargs): """ # Find the named register and get its current state from the periodic Ecobee API data register = self.get_register_by_name(point_name) - try: - return register.get_state(self.ecobee_data) - except: - self.refresh_tokens() - return register.get_state(self.ecobee_data) + return register.get_state(self.ecobee_data) + + def _scrape_all(self): + """ + Fetch point data for all configured points + :return: dictionary of most recent data for all points configured for the driver + """ + result = {} + # Get static registers + programs_register = self.get_register_by_name("Programs") + vacations_register = self.get_register_by_name("Vacations") + status_register = self.get_register_by_name("Status") + # Get all holds + holds = self.get_registers_by_type("hold", True) + self.get_registers_by_type("hold", False) + # Get all settings + settings = self.get_registers_by_type("setting", True) + self.get_registers_by_type("setting", False) + registers = holds + settings + [programs_register, vacations_register, status_register] + # Add data for all holds and settings to our results + for register in registers: + if register.readable: + try: + register_data = register.get_state(self.ecobee_data) + if isinstance(register_data, dict): + result.update(register_data) + else: + result[register.point_name] = register_data + except RuntimeError as re: + _log.warning(re) def _set_point(self, point_name, value, **kwargs): """ @@ -386,15 +386,16 @@ def _set_point(self, point_name, value, **kwargs): register = self.get_register_by_name(point_name) if register.read_only: raise IOError("Trying to write to a point configured read only: {}".format(point_name)) - if register.register_type not in ["setting", "hold", "vacation", "programs"]: - raise RuntimeError("Register {} type {} does not support set_point".format(register.point_name, - register.register_type)) + if register.register_type == "status": + raise RuntimeError("Status register does not support set_point") try: if register.register_type == "setting" or register.register_type == "hold": register.set_state(value) elif register.register_type in ["vacation", "programs"]: register.set_state(value, **kwargs) - except: + except (RemoteError, ConnectionError) as err: + _log.error("Error setting Ecobee point: {}. Refreshing tokens and sending again".format(err)) + self.authorization_stage = "REFRESH_TOKENS" self.refresh_tokens() if register.register_type == "setting" or register.register_type == "hold": register.set_state(value) @@ -403,50 +404,6 @@ def _set_point(self, point_name, value, **kwargs): if register.readable: return register.get_state(self.ecobee_data) - def _scrape_all(self): - """ - Fetch point data for all configured points - :return: dictionary of most recent data for all points configured for the driver - """ - result = {} - # Get static registers - programs_register = self.get_register_by_name("Programs") - vacations_register = self.get_register_by_name("Vacations") - status_register = self.get_register_by_name("Status") - # Get all holds, filter holds that aren't readable points - holds = self.get_registers_by_type("hold", True) + self.get_registers_by_type("hold", False) - holds = [register for register in holds if register.readable] - # Get all settings, filter settings that aren't readable points - settings = self.get_registers_by_type("setting", True) + self.get_registers_by_type("setting", False) - settings = [register for register in settings if register.readable] - registers = holds + settings - # Add data for all holds and settings to our results - for register in registers: - try: - register_data = register.get_state(self.ecobee_data) - if isinstance(register_data, dict): - result.update(register_data) - else: - result[register.point_name] = register_data - except RuntimeError as re: - _log.warning(re) - # include device status in the results - try: - result[status_register.point_name] = status_register.get_state() - except RuntimeError as re: - _log.warning(re) - # include any vacations scheduled for the device - try: - result[vacations_register.point_name] = vacations_register.get_state(self.ecobee_data) - except RuntimeError as re: - _log.warning(re) - # include any scheduled programs for the device - try: - result[programs_register.point_name] = programs_register.get_state(self.ecobee_data) - except RuntimeError as re: - _log.warning(re) - return result - class Setting(BaseRegister): """ @@ -465,20 +422,21 @@ def set_state(self, value): :param value: Arbitrarily specified value to request as set point :return: request response values from settings request """ - if self.read_only: - raise RuntimeError("Attempted write of read-only register {}".format(self.point_name)) - # Generate set state request content and send request - params = {"format": "json"} - thermostat_body = { - "thermostat": { - "settings": { - self.point_name: value - } - } - } - headers, body = generate_set_point_request_objects(self.access_token, "thermostats", self.thermostat_id, - thermostat_body) - return make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, data=params, json=body) + # if self.read_only: + # raise RuntimeError("Attempted write of read-only register {}".format(self.point_name)) + # # Generate set state request content and send request + # params = {"format": "json"} + # thermostat_body = { + # "thermostat": { + # "settings": { + # self.point_name: value + # } + # } + # } + # headers, body = generate_set_point_request_objects(self.access_token, "thermostats", self.thermostat_id, + # thermostat_body) + # return make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, data=params, json=body) + pass def get_state(self, ecobee_data): """ @@ -519,25 +477,26 @@ def set_state(self, value): information for each hold. :return: request response values from settings request """ - if not isinstance(value, dict): - raise ValueError("Hold register set_state expects dict, received {}".format(type(value))) - if "holdType" not in value: - raise ValueError('Hold register requires "holdType" in value dict') - if self.point_name not in value: - raise ValueError("Point name {} not found in Hold set_state value dict") - # Generate set state request content and send reques - params = {"format": "json"} - function_body = { - "functions": [ - { - "type": "setHold", - "params": value - } - ] - } - headers, body = generate_set_point_request_objects(self.access_token, "thermostats", self.thermostat_id, - function_body) - return make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, data=params, json=body) + # if not isinstance(value, dict): + # raise ValueError("Hold register set_state expects dict, received {}".format(type(value))) + # if "holdType" not in value: + # raise ValueError('Hold register requires "holdType" in value dict') + # if self.point_name not in value: + # raise ValueError("Point name {} not found in Hold set_state value dict") + # # Generate set state request content and send reques + # params = {"format": "json"} + # function_body = { + # "functions": [ + # { + # "type": "setHold", + # "params": value + # } + # ] + # } + # headers, body = generate_set_point_request_objects(self.access_token, "thermostats", self.thermostat_id, + # function_body) + # return make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, data=params, json=body) + pass def get_state(self, ecobee_data): """ @@ -583,28 +542,29 @@ def get_state(self): :return: List of currently running equipment connected to Ecobee thermostat """ # Generate set state request content and send request - status_url = "https://api.ecobee.com/1/thermostatSummary" - headers = generate_thermostat_headers(self.access_token) - params = { - 'json': json.dumps({ - "selection": { - "selectionType": "registered", - "selectionMatch": "", - "includeEquipmentStatus": True - } - }) - } - status_message = make_ecobee_request("GET", status_url, headers=headers, params=params) - # Parse the status from the request response - if not status_message: - raise RuntimeError( - "No response data from Ecobee thermostat summary endpoint, could not get thermostat status") - for status_line in status_message["statusList"]: - thermostat, running_equipment = status_line.split(":") - if int(thermostat) == self.thermostat_id: - return running_equipment.split(",") - raise RuntimeError("Could not find status for Ecobee device {} in thermostat summary".format( - self.thermostat_id)) + # status_url = "https://api.ecobee.com/1/thermostatSummary" + # headers = generate_thermostat_headers(self.access_token) + # params = { + # 'json': json.dumps({ + # "selection": { + # "selectionType": "registered", + # "selectionMatch": "", + # "includeEquipmentStatus": True + # } + # }) + # } + # status_message = make_ecobee_request("GET", status_url, headers=headers, params=params) + # # Parse the status from the request response + # if not status_message: + # raise RuntimeError( + # "No response data from Ecobee thermostat summary endpoint, could not get thermostat status") + # for status_line in status_message["statusList"]: + # thermostat, running_equipment = status_line.split(":") + # if int(thermostat) == self.thermostat_id: + # return running_equipment.split(",") + # raise RuntimeError("Could not find status for Ecobee device {} in thermostat summary".format( + # self.thermostat_id)) + pass # TODO deleting a vacation is currently broken @@ -630,59 +590,60 @@ def set_state(self, vacation, delete=False): :param vacation: Vacation name for delete, or vacation object dictionary for create :param delete: Whether to delete the named vacation """ - if delete: - if isinstance(vacation, dict): - vacation = vacation.get("name") - if not vacation: - raise ValueError('Deleting vacation on Ecobee thermostat requires either vacation name string or ' - 'dict with "name" string') - _log.debug("Creating Ecobee vacation deletion request") - # Generate and send delete vacation request to remote API - params = {"format": "json"} - function_body = { - "functions": [ - { - "type": "deleteVacation", - "params": { - "name": vacation - } - } - ] - } - headers, body = generate_set_point_request_objects(self.access_token, "registered", "", function_body) - make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, data=params, json=body) - else: - # Do some basic format validation for vacation dict, but user is ultimately responsible for formatting - # Ecobee API docs describe expected format, link provided below - valid_vacation = True - required_items = ["name", "coolHoldTemp", "heatHoldTemp", "startDate", "startTime", "endDate", "endTime"] - if not isinstance(vacation, dict): - valid_vacation = False - else: - for item in required_items: - if item not in vacation: - valid_vacation = False - break - if not valid_vacation: - raise ValueError('Creating vacation on Ecobee thermostat requires dict: {"name": , ' - '"coolHoldTemp": , "heatHoldTemp": , "startDate": , ' - '"startTime":