Skip to content

Commit

Permalink
Fixed vctl install for rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
craig8 committed Jul 15, 2021
1 parent 71097a0 commit a92efb4
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 46 deletions.
193 changes: 156 additions & 37 deletions volttron/platform/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
# }}}

import argparse
import base64
import collections
import hashlib
import logging
Expand Down Expand Up @@ -391,6 +392,136 @@ def identity_exists(self, identity):

return self._identity_exists(identity)

@RPC.export
def install_agent_rmq(self, vip_identity, filename, topic, force, response_topic):
"""
Install the agent through the rmq message bus.
"""
peer = self.vip.rpc.context.vip_message.peer

protocol_message = None
protocol_headers = None
response_received = False

def protocol_subscription(peer, sender, bus, topic, headers, message):
nonlocal protocol_message, protocol_headers, response_received
_log.debug(f"Received topic, message topic {topic}, {message}")
protocol_message = message
protocol_message = base64.b64decode(protocol_message.encode('utf-8'))
protocol_headers = headers
response_received = True

agent_uuid, agent_existed_before = self._identity_exists_but_no_force(vip_identity, force)
try:
tmpdir = tempfile.mkdtemp()
path = os.path.join(tmpdir, os.path.basename(filename))
store = open(path, 'wb')
sha512 = hashlib.sha512()

try:
request_checksum = base64.b64encode(jsonapi.dumps(['checksum']).encode('utf-8')).decode('utf-8')
request_fetch = base64.b64encode(jsonapi.dumps(['fetch', 1024]).encode('utf-8')).decode('utf-8')

_log.debug(f"Server subscribing to {topic}")
self.vip.pubsub.subscribe(peer="pubsub", prefix=topic, callback=protocol_subscription).get(timeout=5)
gevent.sleep(5)
_log.debug("AFTER SUBSCRIPTION")
while True:

_log.debug(f"Requesting data {request_fetch} sending to {response_topic}")
response_received = False

# request a chunk of the filecl
self.vip.pubsub.publish("pubsub",
topic=response_topic,
message=request_fetch).get(timeout=5)
# chunk binary representation of the bytes read from
# the other side of the connectoin
with gevent.Timeout(30):
_log.debug("Waiting for chunk")
while not response_received:
gevent.sleep(0.1)

# Chunk will be bytes
chunk = protocol_message
_log.debug(f"chunk is {chunk}")
if chunk == b'complete':
_log.debug(f"File transfer complete!")
break

sha512.update(chunk)
store.write(chunk)

with gevent.Timeout(30):
_log.debug("Requesting checksum")
response_received = False
self.vip.pubsub.publish("pubsub",
topic=response_topic, message=request_checksum).get(timeout=5)

while not response_received:
gevent.sleep(0.1)

checksum = protocol_message
assert checksum == sha512.digest()

_log.debug("Outside of while loop in install agent service.")

except AssertionError:
_log.warning("Checksum mismatch on received file")
raise
except gevent.Timeout:
_log.warning("Gevent timeout trying to receive data")
raise
finally:
store.close()
self.vip.pubsub.unsubscribe("pubsub", response_topic, protocol_subscription)
_log.debug('Unsubscribing on server')

_log.debug("After transfering wheel to us now to do stuff.")
agent_data_dir = None
backup_agent_file = None

agent_uuid = self._install_wheel_to_platform(agent_uuid, vip_identity, path, agent_existed_before)
return agent_uuid
finally:
shutil.rmtree(tmpdir, ignore_errors=True)

def _install_wheel_to_platform(self, agent_uuid, vip_identity, path, agent_existed_before):
agent_data_dir = None
backup_agent_file = None
# Fix unbound variable. Only gets set if there is an existing agent already.
publickey = None
secretkey = None
# Note if this is anything then we know we have already got an agent
# mapped to the identity.
if agent_uuid:
_log.debug(f"There is an existing agent {agent_uuid}")
backup_agent_file = "/tmp/{}.tar.gz".format(agent_uuid)
if agent_uuid:
agent_data_dir = self._aip.create_agent_data_dir_if_missing(agent_uuid)

if agent_data_dir:
backup_agent_data(backup_agent_file, agent_data_dir)

keystore = self._aip.get_agent_keystore(agent_uuid)
publickey = keystore.public
secretkey = keystore.secret
_log.info('Removing previous version of agent "{}"\n'
.format(vip_identity))
self.remove_agent(agent_uuid)
_log.debug("Calling aip install_agent.")
agent_uuid = self._aip.install_agent(path,
vip_identity=vip_identity,
publickey=publickey,
secretkey=secretkey)

if agent_existed_before and backup_agent_file is not None:
restore_agent_data_from_tgz(backup_agent_file,
self._aip.create_agent_data_dir_if_missing(agent_uuid))
_log.debug(f"Returning {agent_uuid}")
return agent_uuid


@RPC.export
def install_agent(self, filename, channel_name, vip_identity=None,
publickey=None, secretkey=None, force=False):
Expand Down Expand Up @@ -446,14 +577,7 @@ def install_agent(self, filename, channel_name, vip_identity=None,

# at this point if agent_uuid is populated then there is an
# identity of that already available.
agent_uuid = None
if vip_identity:
agent_uuid = self._identity_exists(vip_identity)
agent_existed_before = False
if agent_uuid:
agent_existed_before = True
if not force:
raise ValueError("Identity already exists, but not forced!")
agent_uuid, agent_existed_before = self._identity_exists_but_no_force(vip_identity, force)
_log.debug(f"rpc: install_agent {agent_uuid}")
# Prepare to install agent that is passed over to us.
peer = self.vip.rpc.context.vip_message.peer
Expand Down Expand Up @@ -505,39 +629,31 @@ def install_agent(self, filename, channel_name, vip_identity=None,
del channel

_log.debug("After transfering wheel to us now to do stuff.")
agent_data_dir = None
backup_agent_file = None
# Note if this is anything then we know we have already got an agent
# mapped to the identity.
if agent_uuid:
_log.debug(f"There is an existing agent {agent_uuid}")
backup_agent_file = "/tmp/{}.tar.gz".format(agent_uuid)
if agent_uuid:
agent_data_dir = self._aip.create_agent_data_dir_if_missing(agent_uuid)

if agent_data_dir:
backup_agent_data(backup_agent_file, agent_data_dir)

keystore = self._aip.get_agent_keystore(agent_uuid)
publickey = keystore.public
secretkey = keystore.secret
_log.info('Removing previous version of agent "{}"\n'
.format(vip_identity))
self.remove_agent(agent_uuid)
_log.debug("Calling aip install_agent.")
agent_uuid = self._aip.install_agent(path,
vip_identity=vip_identity,
publickey=publickey,
secretkey=secretkey)

if agent_existed_before and backup_agent_file is not None:
restore_agent_data_from_tgz(backup_agent_file,
self._aip.create_agent_data_dir_if_missing(agent_uuid))
_log.debug(f"Returning {agent_uuid}")
agent_uuid = self._install_wheel_to_platform(agent_uuid, vip_identity, path, agent_existed_before)
return agent_uuid
finally:
shutil.rmtree(tmpdir, ignore_errors=True)

def _identity_exists_but_no_force(self, vip_identity: str, force: bool):
"""
This will raise a ValueError if the identity passed exists but
force was not True when this function is called.
This function should be called before any agent is installed through
the respective message buses.
"""
# at this point if agent_uuid is populated then there is an
# identity of that already available.
agent_uuid = None
if vip_identity:
agent_uuid = self._identity_exists(vip_identity)
agent_existed_before = False
if agent_uuid:
agent_existed_before = True
if not force:
raise ValueError("Identity already exists, but not forced!")
return agent_uuid, agent_existed_before

def _identity_exists(self, identity:str) -> Optional[str]:
"""
Determines if an agent identity is already installed. This
Expand Down Expand Up @@ -3127,6 +3243,9 @@ def add_parser(*args, **kwargs) -> argparse.ArgumentParser:
opts.connection = None
if utils.is_volttron_running(volttron_home):
opts.connection = ControlConnection(opts.vip_address)

with gevent.Timeout(opts.timeout):
return opts.func(opts)

try:
with gevent.Timeout(opts.timeout):
Expand Down
126 changes: 117 additions & 9 deletions volttron/platform/install_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
# }}}

import argparse
import base64
import hashlib
import logging
import os
Expand Down Expand Up @@ -244,12 +245,110 @@ def send_agent(connection: "ControlConnection", wheel_file: str, vip_identity: s
path = wheel_file
peer = connection.peer
server = connection.server
_log.debug(f"server type is {type(server)} {type(server.core)}")

wheel = open(path, 'rb')
_log.debug(f"Connecting to {peer} to install {path}")
channel = server.vip.channel(peer, 'agent_sender')
channel = None
rmq_send_topic = None
rmq_response_topic = None

if server.core.messagebus == 'zmq':
channel = server.vip.channel(peer, 'agent_sender')
elif server.core.messagebus == 'rmq':
rmq_send_topic = "agent_sender"
rmq_response_topic = "request_data"
else:
raise ValueError("Unknown messagebus detected")

def send_rmq():
nonlocal wheel, server

sha512 = hashlib.sha512()
protocol_message = None
protocol_headers = None
response_received = False

def protocol_requested(peer, sender, bus, topic, headers, message):
nonlocal protocol_message, protocol_headers, response_received

protocol_message = message
protocol_message = base64.b64decode(protocol_message.encode('utf-8'))
protocol_headers = headers
response_received = True

try:
first = True
op = None
size = None
_log.debug(f"Subscribing to {rmq_response_topic}")
server.vip.pubsub.subscribe(peer="pubsub",
prefix=rmq_response_topic,
callback=protocol_requested).get(timeout=5)
gevent.sleep(5)
_log.debug(f"Publishing to {rmq_send_topic}")
while True:
if first:
_log.debug("Waiting for a fetch")
# Wait until we get the first request.
with gevent.Timeout(30):
while not response_received:
gevent.sleep(0.1)

first = False
resp = jsonapi.loads(protocol_message)
_log.debug(f"Got first response {resp}")

def send():
if len(resp) > 1:
op, size = resp
else:
op = resp[0]

if op != 'fetch':
raise ValueError(f'First channel response must be fetch but was {op}')
response_received = False
if op == 'fetch':
chunk = wheel.read(size)
if chunk:
_log.debug(f"Op was fetch sending {size}")
sha512.update(chunk)
# Needs a string to go across the messagebus.
message = base64.b64encode(chunk).decode('utf-8')
server.vip.pubsub.publish(peer="pubsub", topic=rmq_send_topic, message=message).get(timeout=10)
else:
_log.debug(f"Op was fetch sending complete")
message = base64.b64encode(b'complete').decode('utf-8')
server.vip.pubsub.publish(peer="pubsub", topic=rmq_send_topic, message=message).get(timeout=10)
gevent.sleep(10)
break
elif op == 'checksum':
_log.debug(f"sending checksum {sha512.hexdigest()}")
message = base64.b64encode(sha512.digest()).decode('utf-8')
server.vip.pubsub.publish("pubsub", topic=rmq_send_topic, message=message).get(timeout=10)

_log.debug("Waiting for next response")

with gevent.Timeout(30):
while not response_received:
gevent.sleep(0.1)
_log.debug(f"Response received bottom of loop {protocol_message}")
# wait for next response
resp = jsonapi.loads(protocol_message)

# [fetch, size] or checksum
if len(resp) > 1:
op, size = resp
else:
op = resp[0]

finally:
_log.debug("Closing wheel and unsubscribing.")
wheel.close()
server.vip.pubsub.unsubscribe(peer="pubsub", prefix="rmq_response_topic", callback=protocol_requested)



def send_zmq():
nonlocal wheel, channel
sha512 = hashlib.sha512()
try:
Expand All @@ -274,7 +373,7 @@ def send():
op = resp[0]

if op != 'fetch':
raise ValueError(f'First channel response must be fetch but was {fetch}')
raise ValueError(f'First channel response must be fetch but was {op}')

if op == 'fetch':
chunk = wheel.read(size)
Expand Down Expand Up @@ -306,15 +405,24 @@ def send():
channel.close(linger=0)
del channel

_log.debug(f"calling install_agent on {peer} using channel {channel.name}")
result = server.vip.rpc.call(
peer, 'install_agent', os.path.basename(path), channel.name,
vip_identity, publickey, secretkey, force)

task = gevent.spawn(send)
if server.core.messagebus == 'rmq':
_log.debug(f"calling install_agent on {peer} sending to topic {rmq_send_topic}")
task = gevent.spawn(send_rmq)
result = server.vip.rpc.call(
peer, 'install_agent_rmq', vip_identity, os.path.basename(path), rmq_send_topic, force, rmq_response_topic)
elif server.core.messagebus == 'zmq':
_log.debug(f"calling install_agent on {peer} using channel {channel.name}")
task = gevent.spawn(send_zmq)
result = server.vip.rpc.call(
peer, 'install_agent', os.path.basename(path), channel.name, vip_identity, publickey, secretkey, force)

else:
raise ValueError("Unknown messagebus detected!")

result.rawlink(lambda glt: task.kill(block=False))
_log.debug("Completed sending of agent across.")
gevent.wait([result])
_log.debug("Completed sending of agent across.")
_log.debug(f"After wait result is {result}")
return result

Expand Down

0 comments on commit a92efb4

Please sign in to comment.