From a92efb472505132bd4516ec96b299bf4ec4db6d2 Mon Sep 17 00:00:00 2001 From: "C. Allwardt" Date: Wed, 14 Jul 2021 23:51:05 -0700 Subject: [PATCH] Fixed vctl install for rabbitmq --- volttron/platform/control.py | 193 ++++++++++++++++++++++------ volttron/platform/install_agents.py | 126 ++++++++++++++++-- 2 files changed, 273 insertions(+), 46 deletions(-) diff --git a/volttron/platform/control.py b/volttron/platform/control.py index 25d382c661..53144b6bf7 100644 --- a/volttron/platform/control.py +++ b/volttron/platform/control.py @@ -37,6 +37,7 @@ # }}} import argparse +import base64 import collections import hashlib import logging @@ -391,6 +392,136 @@ def identity_exists(self, identity): return self._identity_exists(identity) + @RPC.export + def install_agent_rmq(self, vip_identity, filename, topic, force, response_topic): + """ + Install the agent through the rmq message bus. + """ + peer = self.vip.rpc.context.vip_message.peer + + protocol_message = None + protocol_headers = None + response_received = False + + def protocol_subscription(peer, sender, bus, topic, headers, message): + nonlocal protocol_message, protocol_headers, response_received + _log.debug(f"Received topic, message topic {topic}, {message}") + protocol_message = message + protocol_message = base64.b64decode(protocol_message.encode('utf-8')) + protocol_headers = headers + response_received = True + + agent_uuid, agent_existed_before = self._identity_exists_but_no_force(vip_identity, force) + try: + tmpdir = tempfile.mkdtemp() + path = os.path.join(tmpdir, os.path.basename(filename)) + store = open(path, 'wb') + sha512 = hashlib.sha512() + + try: + request_checksum = base64.b64encode(jsonapi.dumps(['checksum']).encode('utf-8')).decode('utf-8') + request_fetch = base64.b64encode(jsonapi.dumps(['fetch', 1024]).encode('utf-8')).decode('utf-8') + + _log.debug(f"Server subscribing to {topic}") + self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, callback=protocol_subscription).get(timeout=5) + gevent.sleep(5) + _log.debug("AFTER SUBSCRIPTION") + while True: + + _log.debug(f"Requesting data {request_fetch} sending to {response_topic}") + response_received = False + + # request a chunk of the filecl + self.vip.pubsub.publish("pubsub", + topic=response_topic, + message=request_fetch).get(timeout=5) + # chunk binary representation of the bytes read from + # the other side of the connectoin + with gevent.Timeout(30): + _log.debug("Waiting for chunk") + while not response_received: + gevent.sleep(0.1) + + # Chunk will be bytes + chunk = protocol_message + _log.debug(f"chunk is {chunk}") + if chunk == b'complete': + _log.debug(f"File transfer complete!") + break + + sha512.update(chunk) + store.write(chunk) + + with gevent.Timeout(30): + _log.debug("Requesting checksum") + response_received = False + self.vip.pubsub.publish("pubsub", + topic=response_topic, message=request_checksum).get(timeout=5) + + while not response_received: + gevent.sleep(0.1) + + checksum = protocol_message + assert checksum == sha512.digest() + + _log.debug("Outside of while loop in install agent service.") + + except AssertionError: + _log.warning("Checksum mismatch on received file") + raise + except gevent.Timeout: + _log.warning("Gevent timeout trying to receive data") + raise + finally: + store.close() + self.vip.pubsub.unsubscribe("pubsub", response_topic, protocol_subscription) + _log.debug('Unsubscribing on server') + + _log.debug("After transfering wheel to us now to do stuff.") + agent_data_dir = None + backup_agent_file = None + + agent_uuid = self._install_wheel_to_platform(agent_uuid, vip_identity, path, agent_existed_before) + return agent_uuid + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + def _install_wheel_to_platform(self, agent_uuid, vip_identity, path, agent_existed_before): + agent_data_dir = None + backup_agent_file = None + # Fix unbound variable. Only gets set if there is an existing agent already. + publickey = None + secretkey = None + # Note if this is anything then we know we have already got an agent + # mapped to the identity. + if agent_uuid: + _log.debug(f"There is an existing agent {agent_uuid}") + backup_agent_file = "/tmp/{}.tar.gz".format(agent_uuid) + if agent_uuid: + agent_data_dir = self._aip.create_agent_data_dir_if_missing(agent_uuid) + + if agent_data_dir: + backup_agent_data(backup_agent_file, agent_data_dir) + + keystore = self._aip.get_agent_keystore(agent_uuid) + publickey = keystore.public + secretkey = keystore.secret + _log.info('Removing previous version of agent "{}"\n' + .format(vip_identity)) + self.remove_agent(agent_uuid) + _log.debug("Calling aip install_agent.") + agent_uuid = self._aip.install_agent(path, + vip_identity=vip_identity, + publickey=publickey, + secretkey=secretkey) + + if agent_existed_before and backup_agent_file is not None: + restore_agent_data_from_tgz(backup_agent_file, + self._aip.create_agent_data_dir_if_missing(agent_uuid)) + _log.debug(f"Returning {agent_uuid}") + return agent_uuid + + @RPC.export def install_agent(self, filename, channel_name, vip_identity=None, publickey=None, secretkey=None, force=False): @@ -446,14 +577,7 @@ def install_agent(self, filename, channel_name, vip_identity=None, # at this point if agent_uuid is populated then there is an # identity of that already available. - agent_uuid = None - if vip_identity: - agent_uuid = self._identity_exists(vip_identity) - agent_existed_before = False - if agent_uuid: - agent_existed_before = True - if not force: - raise ValueError("Identity already exists, but not forced!") + agent_uuid, agent_existed_before = self._identity_exists_but_no_force(vip_identity, force) _log.debug(f"rpc: install_agent {agent_uuid}") # Prepare to install agent that is passed over to us. peer = self.vip.rpc.context.vip_message.peer @@ -505,39 +629,31 @@ def install_agent(self, filename, channel_name, vip_identity=None, del channel _log.debug("After transfering wheel to us now to do stuff.") - agent_data_dir = None - backup_agent_file = None - # Note if this is anything then we know we have already got an agent - # mapped to the identity. - if agent_uuid: - _log.debug(f"There is an existing agent {agent_uuid}") - backup_agent_file = "/tmp/{}.tar.gz".format(agent_uuid) - if agent_uuid: - agent_data_dir = self._aip.create_agent_data_dir_if_missing(agent_uuid) - - if agent_data_dir: - backup_agent_data(backup_agent_file, agent_data_dir) - - keystore = self._aip.get_agent_keystore(agent_uuid) - publickey = keystore.public - secretkey = keystore.secret - _log.info('Removing previous version of agent "{}"\n' - .format(vip_identity)) - self.remove_agent(agent_uuid) - _log.debug("Calling aip install_agent.") - agent_uuid = self._aip.install_agent(path, - vip_identity=vip_identity, - publickey=publickey, - secretkey=secretkey) - - if agent_existed_before and backup_agent_file is not None: - restore_agent_data_from_tgz(backup_agent_file, - self._aip.create_agent_data_dir_if_missing(agent_uuid)) - _log.debug(f"Returning {agent_uuid}") + agent_uuid = self._install_wheel_to_platform(agent_uuid, vip_identity, path, agent_existed_before) return agent_uuid finally: shutil.rmtree(tmpdir, ignore_errors=True) + def _identity_exists_but_no_force(self, vip_identity: str, force: bool): + """ + This will raise a ValueError if the identity passed exists but + force was not True when this function is called. + + This function should be called before any agent is installed through + the respective message buses. + """ + # at this point if agent_uuid is populated then there is an + # identity of that already available. + agent_uuid = None + if vip_identity: + agent_uuid = self._identity_exists(vip_identity) + agent_existed_before = False + if agent_uuid: + agent_existed_before = True + if not force: + raise ValueError("Identity already exists, but not forced!") + return agent_uuid, agent_existed_before + def _identity_exists(self, identity:str) -> Optional[str]: """ Determines if an agent identity is already installed. This @@ -3127,6 +3243,9 @@ def add_parser(*args, **kwargs) -> argparse.ArgumentParser: opts.connection = None if utils.is_volttron_running(volttron_home): opts.connection = ControlConnection(opts.vip_address) + + with gevent.Timeout(opts.timeout): + return opts.func(opts) try: with gevent.Timeout(opts.timeout): diff --git a/volttron/platform/install_agents.py b/volttron/platform/install_agents.py index eae2780df1..02a80e6f15 100644 --- a/volttron/platform/install_agents.py +++ b/volttron/platform/install_agents.py @@ -37,6 +37,7 @@ # }}} import argparse +import base64 import hashlib import logging import os @@ -244,12 +245,110 @@ def send_agent(connection: "ControlConnection", wheel_file: str, vip_identity: s path = wheel_file peer = connection.peer server = connection.server + _log.debug(f"server type is {type(server)} {type(server.core)}") wheel = open(path, 'rb') _log.debug(f"Connecting to {peer} to install {path}") - channel = server.vip.channel(peer, 'agent_sender') + channel = None + rmq_send_topic = None + rmq_response_topic = None + + if server.core.messagebus == 'zmq': + channel = server.vip.channel(peer, 'agent_sender') + elif server.core.messagebus == 'rmq': + rmq_send_topic = "agent_sender" + rmq_response_topic = "request_data" + else: + raise ValueError("Unknown messagebus detected") + + def send_rmq(): + nonlocal wheel, server + + sha512 = hashlib.sha512() + protocol_message = None + protocol_headers = None + response_received = False + + def protocol_requested(peer, sender, bus, topic, headers, message): + nonlocal protocol_message, protocol_headers, response_received + + protocol_message = message + protocol_message = base64.b64decode(protocol_message.encode('utf-8')) + protocol_headers = headers + response_received = True + + try: + first = True + op = None + size = None + _log.debug(f"Subscribing to {rmq_response_topic}") + server.vip.pubsub.subscribe(peer="pubsub", + prefix=rmq_response_topic, + callback=protocol_requested).get(timeout=5) + gevent.sleep(5) + _log.debug(f"Publishing to {rmq_send_topic}") + while True: + if first: + _log.debug("Waiting for a fetch") + # Wait until we get the first request. + with gevent.Timeout(30): + while not response_received: + gevent.sleep(0.1) + + first = False + resp = jsonapi.loads(protocol_message) + _log.debug(f"Got first response {resp}") - def send(): + if len(resp) > 1: + op, size = resp + else: + op = resp[0] + + if op != 'fetch': + raise ValueError(f'First channel response must be fetch but was {op}') + response_received = False + if op == 'fetch': + chunk = wheel.read(size) + if chunk: + _log.debug(f"Op was fetch sending {size}") + sha512.update(chunk) + # Needs a string to go across the messagebus. + message = base64.b64encode(chunk).decode('utf-8') + server.vip.pubsub.publish(peer="pubsub", topic=rmq_send_topic, message=message).get(timeout=10) + else: + _log.debug(f"Op was fetch sending complete") + message = base64.b64encode(b'complete').decode('utf-8') + server.vip.pubsub.publish(peer="pubsub", topic=rmq_send_topic, message=message).get(timeout=10) + gevent.sleep(10) + break + elif op == 'checksum': + _log.debug(f"sending checksum {sha512.hexdigest()}") + message = base64.b64encode(sha512.digest()).decode('utf-8') + server.vip.pubsub.publish("pubsub", topic=rmq_send_topic, message=message).get(timeout=10) + + _log.debug("Waiting for next response") + + with gevent.Timeout(30): + while not response_received: + gevent.sleep(0.1) + _log.debug(f"Response received bottom of loop {protocol_message}") + # wait for next response + resp = jsonapi.loads(protocol_message) + + # [fetch, size] or checksum + if len(resp) > 1: + op, size = resp + else: + op = resp[0] + + finally: + _log.debug("Closing wheel and unsubscribing.") + wheel.close() + server.vip.pubsub.unsubscribe(peer="pubsub", prefix="rmq_response_topic", callback=protocol_requested) + + + + def send_zmq(): nonlocal wheel, channel sha512 = hashlib.sha512() try: @@ -274,7 +373,7 @@ def send(): op = resp[0] if op != 'fetch': - raise ValueError(f'First channel response must be fetch but was {fetch}') + raise ValueError(f'First channel response must be fetch but was {op}') if op == 'fetch': chunk = wheel.read(size) @@ -306,15 +405,24 @@ def send(): channel.close(linger=0) del channel - _log.debug(f"calling install_agent on {peer} using channel {channel.name}") - result = server.vip.rpc.call( - peer, 'install_agent', os.path.basename(path), channel.name, - vip_identity, publickey, secretkey, force) - task = gevent.spawn(send) + if server.core.messagebus == 'rmq': + _log.debug(f"calling install_agent on {peer} sending to topic {rmq_send_topic}") + task = gevent.spawn(send_rmq) + result = server.vip.rpc.call( + peer, 'install_agent_rmq', vip_identity, os.path.basename(path), rmq_send_topic, force, rmq_response_topic) + elif server.core.messagebus == 'zmq': + _log.debug(f"calling install_agent on {peer} using channel {channel.name}") + task = gevent.spawn(send_zmq) + result = server.vip.rpc.call( + peer, 'install_agent', os.path.basename(path), channel.name, vip_identity, publickey, secretkey, force) + + else: + raise ValueError("Unknown messagebus detected!") + result.rawlink(lambda glt: task.kill(block=False)) - _log.debug("Completed sending of agent across.") gevent.wait([result]) + _log.debug("Completed sending of agent across.") _log.debug(f"After wait result is {result}") return result