Skip to content

Commit

Permalink
Merge pull request #2377 from sgilbride/stand_alone_csv
Browse files Browse the repository at this point in the history
Stand Alone File Watch Publisher Agent
  • Loading branch information
craig8 authored Jun 23, 2020
2 parents 9c2c436 + 038244b commit 10e75a7
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 2 deletions.
55 changes: 55 additions & 0 deletions examples/StandAloneFileWatcher/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This config is used by the agent to determine which files to watch
# and which topic additions to that file should be published on.
# The config needs to be taken in as a list, regardless of the number
# of entries.
#
# Example config:
# config = [
# {
# "file": "/full/path/to/file",
# "topic": "topic/to/publish/changes"
# }
# ]

config = [
{
"file": "/var/log/syslog",
"topic": "platform/syslog"
},
{
"file": "/volttron/tempfile.csv",
"topic": "temp/filepublisher"
}
]

# The parameters dictionary is used to populate the agent's
# remote vip address.
_params = {
# The root of the address.
# Note:
# 1. volttron instance should be configured to use tcp. use command vcfg
# to configure
'vip_address': 'tcp://127.0.0.1',
'port': 22916,

# public and secret key for the standalonelistener agent.
# These can be created using the command: volttron-ctl auth keypair
# public key should also be added to the volttron instance auth
# configuration to enable standalone agent access to volttron instance. Use
# command 'vctl auth add' Provide this agent's public key when prompted
# for credential.

'agent_public': 'rn_V3vxTLMwaIRUEIAIHee4-qM8X70irDThcn_TX6FA',
'agent_secret': 'DY4FhighTlv9UjKlCNIh-1WKp-M5nIHJacWsPZ0ixEs',

# Public server key from the remote platform. This can be
# obtained using the command:
# volttron-ctl auth serverkey
'server_key': 'yqhjAOOeRO3Ls6Bih-QoplMWfMcIJJtg_H4uGS8Z9lw'
}


def remote_url():
return "{vip_address}:{port}?serverkey={server_key}" \
"&publickey={agent_public}&" \
"secretkey={agent_secret}".format(**_params)
142 changes: 142 additions & 0 deletions examples/StandAloneFileWatcher/standalonefilewatchpublisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
'''
This python script will listen to the specified files and publish
updates to specific topics on the remote instance.
Setup:
~~~~~
1. Make sure volttron instance is running using tcp address. use vcfg
command to configure the volttron instance address.
2. Update settings.py
3. Add this standalone agent to volttron auth entry using vctl auth add
command. Provide ip of the volttron instance when prompted for
address[]: and provide public key of standalone agent when prompted
for credentials[]:
For more details see
https://volttron.readthedocs.io/en/develop/devguides/walkthroughs/Agent-Authentication-Walkthrough.html
Example command:
.. code-block:: console
(volttron)[vdev@cs_cbox myvolttron]$ vctl auth add
domain []:
address []: 127.0.0.1
user_id []:
capabilities (delimit multiple entries with comma) []:
roles (delimit multiple entries with comma) []:
groups (delimit multiple entries with comma) []:
mechanism [CURVE]:
credentials []: rn_V3vxTLMwaIRUEIAIHee4-qM8X70irDThcn_TX6FA
comments []:
enabled [True]:
4. With a volttron activated shell this script can be run like:
python standalonefilewatchpublisher.py
'''

from datetime import datetime
import os
import sys

import gevent
import logging


from volttron.platform.vip.agent import Agent, PubSub, RPC, Core
from volttron.platform.agent import utils
from volttron.platform.agent.utils import watch_file_with_fullpath
from volttron.platform import jsonapi


# These are the options that can be set from the settings module.
from settings import remote_url, config

# Setup logging so that we could use it if we needed to.
utils.setup_logging()
_log = logging.getLogger(__name__)

logging.basicConfig(
level=logging.debug,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%m-%d-%y %H:%M:%S')


class StandAloneFileWatchPublisher(Agent):
''' A standalone version of the FileWatcherPublisher'''
def __init__(self, **kwargs):
super(StandAloneFileWatchPublisher, self).__init__(**kwargs)
self.config = config
items = config[:]
self.file_topic = {}
self.file_end_position = {}
for item in self.config:
file = item["file"]
self.file_topic[file] = item["topic"]
if os.path.isfile(file):
with open(file, 'r') as f:
self.file_end_position[file] = self.get_end_position(f)
else:
_log.error("File " + file + " does not exists. Ignoring this file.")
items.remove(item)
self.config = items

def onmessage(self, peer, sender, bus, topic, headers, message):
'''Handle incoming messages on the bus.'''
d = {'topic': topic, 'headers': headers, 'message': message}
sys.stdout.write(jsonapi.dumps(d)+'\n')

@Core.receiver('onstart')
def starting(self, sender, **kwargs):
_log.info("Starting "+self.__class__.__name__+" agent")
if len(self.config) == 0 :
_log.error("No file to watch and publish. Stopping "+self.__class__.__name__+" agent.")
gevent.spawn_later(3, self.core.stop)
else:
for item in self.config:
file = item["file"]
self.core.spawn(watch_file_with_fullpath, file, self.read_file)

def read_file(self, file):
_log.debug('loading file %s', file)
with open(file, 'r') as f:
f.seek(self.file_end_position[file])
for line in f:
self.publish_file(line.strip(),self.file_topic[file])
self.file_end_position[file] = self.get_end_position(f)

def publish_file(self, line, topic):
message = {'timestamp': datetime.utcnow().isoformat() + 'Z',
'line': line}
_log.debug('publishing message {} on topic {}'.format(message, topic))
self.vip.pubsub.publish(peer="pubsub", topic=topic,
message=message)

def get_end_position(self, f):
f.seek(0, 2)
return f.tell()


if __name__ == '__main__':
try:
# If stdout is a pipe, re-open it line buffered
if utils.isapipe(sys.stdout):
# Hold a reference to the previous file object so it doesn't
# get garbage collected and close the underlying descriptor.
stdout = sys.stdout
sys.stdout = os.fdopen(stdout.fileno(), 'w', 1)

print(remote_url())
agent = StandAloneFileWatchPublisher(address=remote_url(),
identity='standalone.filewatchpublisher')
task = gevent.spawn(agent.core.run)
try:
task.join()
finally:
task.kill()
except KeyboardInterrupt:
pass
4 changes: 2 additions & 2 deletions volttron/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class AbsolutePathFileReloader(PatternMatchingEventHandler):
filetowatch *.json will watch all json files in <volttron_home>
"""
def __init__(self, filetowatch, callback):
super(VolttronHomeFileReloader, self).__init__([filetowatch])
super(AbsolutePathFileReloader, self).__init__([filetowatch])
self._callback = callback
self._filetowatch = filetowatch

Expand All @@ -134,7 +134,7 @@ def watchfile(self):
def on_any_event(self, event):
_log.debug("Calling callback on event {}. Calling {}".format(event, self._callback))
try:
self._callback()
self._callback(self._filetowatch)
except BaseException as e:
_log.error("Exception in callback: {}".format(e))
_log.debug("After callback on event {}".format(event))

0 comments on commit 10e75a7

Please sign in to comment.