diff --git a/examples/SCPAgent/README.rst b/examples/SCPAgent/README.rst new file mode 100644 index 0000000000..66e7f976bf --- /dev/null +++ b/examples/SCPAgent/README.rst @@ -0,0 +1,69 @@ +SCP Agent +========= + +The purpose of this example agent is to demonstrate secure copy of files from/to +external resources. SCP uses ssh protocol for creating an encrypted connection +between the agent and the resources. + +Configuration +------------- + +The SCP Agent requires a few configuration elements in order for the agent to run. + +.. csv-table:: Configuration Table + :header: "Parameter", "Example", "Description" + :widths: 15, 15, 30 + + "ssh_id", "~/.ssh/id_rsa", "Path to the identity file to allow connectivity from the host to remote communication" + "remote_user", "user@remote.com", "The user and resolvable host for connecting to" + +Interfaces +---------- + +The SCP Agent has both a pubsub and rpc base interfaces. + +RPC Interface +~~~~~~~~~~~~~ + +There are two methods available for the rpc interface the difference between the two +is the direction of the file exchange. + +.. code-block::python + + result = agent.vip.rpc.call("scp.agent", "trigger_download", + remote_path="/home/osboxes/Downloads/f2.txt", + local_path="/home/osboxes/Desktop/f6.txt").get(timeout=10) + + result = agent.vip.rpc.call("scp.agent", "trigger_upload", + remote_path="/home/osboxes/Downloads/f6.txt", + local_path="/home/osboxes/Desktop/f6.txt").get(timeout=10) + +PubSub Interface +~~~~~~~~~~~~~~~~ + +The pubsub interface requires sending of path through the pubsub subsystem. The pubsub requires either a +json string or dictionary be sent across the message bus to the agent on the transfer topic will start +the scp transfer. + +.. code-block::python + + agent.vip.pubsub.publish(peer='pubsub', topic="transfer", message=dict(remote_path=remote_path, + local_path=local_path, + direction="SENDING")).get(timeout=5) + + agent.vip.pubsub.publish(peer='pubsub', topic="transfer", message=dict(remote_path=remote_path, + local_path=local_path, + direction="RECEIVING")).get(timeout=5) + + +Testing +------- + +Within the agent directory there is a trigger_scp.py script. By default the trigger will run through 4 different +tests. The tests will exercise the sending and receiving for both the rpc and pubsub interfaces. The trigger will +require user interaction so run it with a shell that can receive input. + +.. code-block::shell + + (volttron) (base) osboxes@osboxes:~/repos/volttron$ python examples/SCPAgent/trigger_scp.py + diff --git a/examples/SCPAgent/config.yml b/examples/SCPAgent/config.yml new file mode 100644 index 0000000000..f959262e95 --- /dev/null +++ b/examples/SCPAgent/config.yml @@ -0,0 +1,3 @@ +--- +ssh_id: ~/.ssh/id_rsa +remote_user: osboxes@localhost diff --git a/examples/SCPAgent/scp/__init__.py b/examples/SCPAgent/scp/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/SCPAgent/scp/agent.py b/examples/SCPAgent/scp/agent.py new file mode 100644 index 0000000000..a9fbf46beb --- /dev/null +++ b/examples/SCPAgent/scp/agent.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- {{{ +# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et: +# +# Copyright 2019, Battelle Memorial Institute. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This material was prepared as an account of work sponsored by an agency of +# the United States Government. Neither the United States Government nor the +# United States Department of Energy, nor Battelle, nor any of their +# employees, nor any jurisdiction or organization that has cooperated in the +# development of these materials, makes any warranty, express or +# implied, or assumes any legal liability or responsibility for the accuracy, +# completeness, or usefulness or any information, apparatus, product, +# software, or process disclosed, or represents that its use would not infringe +# privately owned rights. Reference herein to any specific commercial product, +# process, or service by trade name, trademark, manufacturer, or otherwise +# does not necessarily constitute or imply its endorsement, recommendation, or +# favoring by the United States Government or any agency thereof, or +# Battelle Memorial Institute. The views and opinions of authors expressed +# herein do not necessarily state or reflect those of the +# United States Government or any agency thereof. +# +# PACIFIC NORTHWEST NATIONAL LABORATORY operated by +# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY +# under Contract DE-AC05-76RL01830 +# }}} + +from enum import Enum, auto +import inspect +from json import JSONDecodeError +import logging +from pathlib import Path + +from gevent import subprocess + +from volttron.platform import jsonapi +from volttron.platform.vip.agent import Agent, Core, RPC +from volttron.platform.agent.utils import vip_main, load_config + + +__version__ = "0.1" +logging.basicConfig(level=logging.DEBUG) +_log = logging.getLogger(inspect.getmodulename(__file__)) + + +class WhichWayEnum(Enum): + SENDING = auto() + RECEIVING = auto() + + +class ScpAgent(Agent): + def __init__(self, config_path, **kwargs): + super(ScpAgent, self).__init__(**kwargs) + config = load_config(config_path) + self._remote_user = None + self._ssh_id = None + self.default_config = dict( + ssh_id=config.get("ssh_id"), # "~/.ssh/id_rsa", + remote_user=config.get("remote_user") # "osboxes@localhost" + ) + self.vip.config.set_default("config", self.default_config) + self.vip.config.subscribe(self.configure, + actions=["NEW", "UPDATE"], pattern="config") + self._subscribed = False + + def configure(self, config_name, action, contents): + conf = {} + conf.update(contents) + self._ssh_id = conf.get("ssh_id") + self._remote_user = conf.get('remote_user') + + if not self._subscribed: + self.vip.pubsub.subscribe(peer="pubsub", prefix="transfer", callback=self.transfer_file) + self._subscribed = True + + def transfer_file(self, peer, sender, bus, topic, headers, message): + """ + Pubsub interface for transferring files. + + The interface requires message to be a dictionary like object + or a json serializable string with the following required structure: + + { + "direction": "SENDING" + "remote_path": "/remote/path/file.txt", + "local_path": "/local/path/file.txt" + } + + The above direction must be either "SENDING" or "RECEIVING". The path must be available + on the host that is providing the content and will overwrite the data on the receiving + side of the connection. + + """ + enabled = self.__check_configuration__() + + if not enabled: + return False + + if isinstance(message, str): + try: + message = jsonapi.loads(message) + except JSONDecodeError: + _log.error(f"Invalid json passed through string interface") + return + + direction = message.get("direction") + remote_path = message.get("remote_path") + local_path = message.get("local_path") + + enabled = True + if not remote_path: + enabled = False + _log.error(f"remote_path not specified in message to pub sub") + + if not local_path: + enabled = False + _log.error(f"local_path not specified in message to pub sub") + + if direction not in WhichWayEnum.__members__: + _log.error(f"which_way must be either SENDING or RECEIVING.") + enabled = False + + if not enabled: + return + + if direction == WhichWayEnum.SENDING.name: + success = self.__handle_scp__(WhichWayEnum.SENDING, local_path, remote_path) + else: + success = self.__handle_scp__(WhichWayEnum.RECEIVING, remote_path, local_path) + + if not success: + _log.error(f"Unable to send to/recieve scp files.") + + @RPC.export + def trigger_download(self, remote_path, local_path): + _log.debug('Triggering download') + enabled = self.__check_configuration__() + + if not enabled: + return False + + return self.__handle_scp__(WhichWayEnum.RECEIVING, remote_path, local_path) + + @RPC.export + def trigger_upload(self, local_path, remote_path): + _log.debug('Trigger upload') + enabled = self.__check_configuration__() + + if not enabled: + return False + + return self.__handle_scp__(WhichWayEnum.SENDING, local_path, remote_path) + + def __check_configuration__(self): + enabled = True + if self._ssh_id is None: + _log.error("Configuration error, ssh_id is not set") + enabled = False + if self._remote_user is None: + _log.error("Configuration error, invalid remote user configured") + enabled = False + return enabled + + def __handle_scp__(self, which_way: WhichWayEnum, from_arg, to_arg): + cmd = ["scp", "-o", "LogLevel=VERBOSE", + "-o", "PasswordAuthentication=no", + "-o", "IdentitiesOnly=yes", + "-o", "Compression=yes", + "-i", self._ssh_id] + + if which_way == WhichWayEnum.SENDING: + cmd.extend([f"{from_arg}", f"{self._remote_user}:{to_arg}"]) + else: + cmd.extend([f"{self._remote_user}:{from_arg}", f"{to_arg}"]) + + p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE) + p.wait() + _log.debug(p.stderr.read().decode('utf-8')) + _log.debug(p.stdout.read().decode('utf-8')) + _log.debug(f"Complete {which_way.name}") + _log.debug(f"Return code: {p.returncode}") + if p.returncode == 0: + return True + return False + + +if __name__ == '__main__': + vip_main(ScpAgent, version=__version__) diff --git a/examples/SCPAgent/trigger_scp.py b/examples/SCPAgent/trigger_scp.py new file mode 100644 index 0000000000..3782ed8b68 --- /dev/null +++ b/examples/SCPAgent/trigger_scp.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- {{{ +# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et: +# +# Copyright 2019, Battelle Memorial Institute. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This material was prepared as an account of work sponsored by an agency of +# the United States Government. Neither the United States Government nor the +# United States Department of Energy, nor Battelle, nor any of their +# employees, nor any jurisdiction or organization that has cooperated in the +# development of these materials, makes any warranty, express or +# implied, or assumes any legal liability or responsibility for the accuracy, +# completeness, or usefulness or any information, apparatus, product, +# software, or process disclosed, or represents that its use would not infringe +# privately owned rights. Reference herein to any specific commercial product, +# process, or service by trade name, trademark, manufacturer, or otherwise +# does not necessarily constitute or imply its endorsement, recommendation, or +# favoring by the United States Government or any agency thereof, or +# Battelle Memorial Institute. The views and opinions of authors expressed +# herein do not necessarily state or reflect those of the +# United States Government or any agency thereof. +# +# PACIFIC NORTHWEST NATIONAL LABORATORY operated by +# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY +# under Contract DE-AC05-76RL01830 +# }}} +from pathlib import Path +import os +import shutil + +import argparse +import gevent +from volttron.platform.vip.agent.utils import build_agent + + +def run_tests(local_root="~/local_files", remote_root="~/remote_files"): + agent = build_agent(identity='trigger') + + local_root = Path(local_root).expanduser() + remote_root = Path(remote_root).expanduser() + + def build_remote_filename(filename): + os.makedirs(remote_root, exist_ok=True) + return str(Path(remote_root).joinpath(filename)) + + def build_local_filename(filename): + os.makedirs(local_root, exist_ok=True) + return str(Path(local_root).joinpath(filename)) + + def create_remote_file(filename, content): + full_path = build_remote_filename(filename) + with open(full_path, 'w') as fp: + fp.write(content) + return full_path + + def create_local_file(filename, content): + full_path = build_local_filename(filename) + with open(full_path, 'w') as fp: + fp.write(content) + return full_path + + def remove_files(): + shutil.rmtree(remote_root, ignore_errors=True) + shutil.rmtree(local_root, ignore_errors=True) + + remove_files() + + remote_path = create_remote_file("t1.txt", "this is f1") + local_path = build_local_filename("t1.after.transfer.txt") + + go = input(f"Test 1: rpc: trigger_download\n\tdownload remote_path: {remote_path}\n\tto local_path: {local_path} ") + result = agent.vip.rpc.call("scp.agent", "trigger_download", + remote_path=remote_path, + local_path=local_path).get() + print(f"The result was {result}\n") + + print(f"Creating test2 file") + remote_path = build_remote_filename("t2.remote.transfer.txt") + local_path = create_local_file("t2.txt", "This is test 2") + go = input(f"Test 2: rpc: trigger_upload\n\tupload local_path: {local_path}\n\tto remote_path: {remote_path} ") + result = agent.vip.rpc.call("scp.agent", "trigger_upload", + remote_path=remote_path, + local_path=local_path).get() + print(f"The result was {result}\n") + + print(f"Creating test3 file") + remote_path = build_remote_filename("t3.sent.pubsub.txt") + local_path = create_local_file("t3.txt", "This is test 3") + + go = input(f"Test 3: pubsub: SENDING\n\tlocal_path: {local_path}\n\tto remote_path: {remote_path} ") + + agent.vip.pubsub.publish(peer='pubsub', topic="transfer", message=dict(remote_path=remote_path, + local_path=local_path, + direction="SENDING")).get() + gevent.sleep(1) + print(f"The result is {Path(remote_path).exists()}\n") + print(f"Creating test4 file") + remote_path = create_remote_file("t4.receive.pubsub.txt", "This is test 4") + local_path = build_local_filename("t4.receive.txt") + + go = input(f"Test 4: pubsub: RECEIVING\n\tlocal_path: {local_path}\n\tfrom remote_path: {remote_path} ") + agent.vip.pubsub.publish(peer='pubsub', topic="transfer", message=dict(remote_path=remote_path, + local_path=local_path, + direction="RECEIVING")).get() + gevent.sleep(1) + print(f"The result is {Path(local_path).exists()}\n") + agent.core.stop() + print("Complete") + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("-l", "--local_root", help="Local path", default="~/local_root") + parser.add_argument("-r", "--remote_root", help="Remote path", default="~/remote_root") + + args = parser.parse_args() + run_tests(args.local_root, args.remote_root) diff --git a/pytest.ini b/pytest.ini index f206313257..d500024eb5 100644 --- a/pytest.ini +++ b/pytest.ini @@ -52,6 +52,7 @@ markers = rmq_reconnect: rabbitmq reconnect tests rmq_shutdown: rabbitmq shutdown tests secure: Test platform and agents with secure platform options + rpc: Tests for RPC mysqlfuncts: level one integration tests for mysqlfuncts postgresqlfuncts: level one integration tests for postgresqlfuncts dbutils: test all the level one integrations tests for dbfuncts classes diff --git a/volttron/platform/keystore.py b/volttron/platform/keystore.py index d47087592a..1dfe7d73bb 100644 --- a/volttron/platform/keystore.py +++ b/volttron/platform/keystore.py @@ -135,8 +135,24 @@ def get_agent_keystore_path(identity=None): def generate_keypair_dict(): """Generate and return new keypair as dictionary""" public, secret = curve_keypair() - return {'public': encode_key(public), - 'secret': encode_key(secret)} + encoded_public = encode_key(public) + encoded_secret = encode_key(secret) + attempts = 0 + max_attempts = 3 + + done = False + while not done and attempts < max_attempts: + # Keys that start with '-' are hard to use and cause issues with the platform + if encoded_secret.startswith('-') or encoded_public.startswith('-'): + # try generating public and secret key again + public, secret = curve_keypair() + encoded_public = encode_key(public) + encoded_secret = encode_key(secret) + else: + done = True + + return {'public': encoded_public, + 'secret': encoded_secret} def generate(self): """Generate and store new key pair""" diff --git a/volttron/platform/vip/agent/subsystems/rpc.py b/volttron/platform/vip/agent/subsystems/rpc.py index 7945f2dc4c..57c6e5d7b7 100644 --- a/volttron/platform/vip/agent/subsystems/rpc.py +++ b/volttron/platform/vip/agent/subsystems/rpc.py @@ -168,11 +168,19 @@ def method(self, request, ident, name, args, kwargs, del local.request del local.batch - def _inspect(self, method): - params = inspect.getargspec(method) - if hasattr(method, 'im_self'): - params.args.pop(0) - response = {'params': params} + @staticmethod + def _inspect(method): + response = {'params': {}} + signature = inspect.signature(method) + for p in signature.parameters.values(): + response['params'][p.name] = { + 'kind': p.kind.name + } + if p.default is not inspect.Parameter.empty: + response['params'][p.name]['default'] = p.default + if p.annotation is not inspect.Parameter.empty: + annotation = p.annotation.__name__ if type(p.annotation) is type else str(p.annotation) + response['params'][p.name]['annotation'] = annotation doc = inspect.getdoc(method) if doc: response['doc'] = doc @@ -181,15 +189,16 @@ def _inspect(self, method): cut = len(os.path.commonprefix([_ROOT_PACKAGE_PATH, source])) source = source[cut:] lineno = inspect.getsourcelines(method)[1] - except IOError: + except Exception: pass else: - response['source'] = source, lineno - try: - # pylint: disable=protected-access - response['return'] = method._returns - except AttributeError: - pass + response['source'] = { + 'file': source, + 'line_number': lineno + } + ret = signature.return_annotation + if ret is not inspect.Signature.empty: + response['return'] = ret.__name__ if type(ret) is type else str(ret) return response diff --git a/volttrontesting/subsystems/test_rpc_subsystem.py b/volttrontesting/subsystems/test_rpc_subsystem.py new file mode 100644 index 0000000000..4ef775ce92 --- /dev/null +++ b/volttrontesting/subsystems/test_rpc_subsystem.py @@ -0,0 +1,51 @@ +import os +import pytest +import inspect +from volttron.platform.vip.agent import RPC +from volttron.platform.vip.agent import Agent +from typing import Optional, Union, List + +class _ExporterTestAgent(Agent): + def __init__(self, **kwargs): + super(_ExporterTestAgent, self).__init__(**kwargs) + + @RPC.export('test_method') + def test_method(self, param1: int, param2: Union[str, List[str]], *, param3: bool = True, + param4: Optional[Union[float, List[float]]] = None) -> dict: + """Doc String""" + return {'param1': param1, 'param2': param2, param3: param3, 'param4': param4} + + +@pytest.mark.rpc +def test_method_inspection(volttron_instance): + """ Tests RPC Method Inspection + + :param volttron_instance: + :return: + """ + + lineno = inspect.getsourcelines(_ExporterTestAgent.test_method)[1] + test_output = { + 'doc': 'Doc String', + 'params': {'param1': {'annotation': 'int', + 'kind': 'POSITIONAL_OR_KEYWORD'}, + 'param2': {'annotation': 'typing.Union[str, typing.List[str]]', + 'kind': 'POSITIONAL_OR_KEYWORD'}, + 'param3': {'annotation': 'bool', + 'default': True, + 'kind': 'KEYWORD_ONLY'}, + 'param4': {'annotation': 'typing.Union[float, typing.List[float], ' + 'NoneType]', + 'default': None, + 'kind': 'KEYWORD_ONLY'}}, + 'return': 'dict', + 'source': {'file': 'volttrontesting/subsystems/test_rpc_subsystem.py', # Must change if this file moves! + 'line_number': lineno}, + } + + new_agent1 = volttron_instance.build_agent(identity='test_inspect1', agent_class=_ExporterTestAgent) + new_agent2 = volttron_instance.build_agent(identity='test_inspect2') + + result = new_agent2.vip.rpc.call('test_inspect1', 'test_method.inspect').get() + + assert result == test_output diff --git a/volttrontesting/utils/platformwrapper.py b/volttrontesting/utils/platformwrapper.py index 63473c3b29..885dab3e8f 100644 --- a/volttrontesting/utils/platformwrapper.py +++ b/volttrontesting/utils/platformwrapper.py @@ -784,7 +784,8 @@ def subscribe_to_all(peer, sender, bus, topic, headers, messages): # can enable the WebAdminApi. if self.ssl_auth: self._web_admin_api = WebAdminApi(self) - + + gevent.sleep(10) def is_running(self):