diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000..182dcf65e8 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +# Handles CLRF/RF EOL issue +* text=auto \ No newline at end of file diff --git a/.github/workflows/pytest-auth.yml b/.github/workflows/pytest-auth.yml index 3a198cd8f7..843b9f60de 100644 --- a/.github/workflows/pytest-auth.yml +++ b/.github/workflows/pytest-auth.yml @@ -14,7 +14,7 @@ name: Testing platform auth # Determine what events are going to trigger a running of the workflow -on: [push, pull_request] +on: [pull_request, push] jobs: # The job named build @@ -25,7 +25,7 @@ jobs: fail-fast: false matrix: # Each entry in the os and python-version matrix will be run so for the 3 x 4 there will be 12 jobs run - os: [ ubuntu-16.04, ubuntu-18.04, ubuntu-20.04 ] + os: [ ubuntu-18.04, ubuntu-20.04 ] python-version: [ 3.6, 3.7] # , 3.8, 3.9 ] # Run-on determines the operating system available to run on @@ -38,50 +38,20 @@ jobs: # checkout the volttron repository and set current direectory to it - uses: actions/checkout@v2 - # Install erlang for rabbitmq - - name: Install erlang - if: matrix.os != 'ubuntu-20.04' - run: | - sudo scripts/rabbit_dependencies.sh debian ${{ matrix.os }} - # setup the python environment for the operating system - name: Set up Python ${{matrix.os}} ${{ matrix.python-version }} uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} - # Attempt to restore the cache from the build-dependency-cache workflow if present then - # the output value steps.check_files.outputs.files_exists will be set (see the next step for usage) - - name: Has restored cache - id: check_files - uses: andstor/file-existence-action@v1 - with: - files: "env/bin/activate" - - # This step is only run if the cache wasn't able to be restored. - - name: Install dependencies including rmq - if: steps.check_files.outputs.files_exists != 'true' && matrix.os != 'ubuntu-20.04' - run: | - pip install wheel - python bootstrap.py --all --rabbitmq --force - - - name: Install dependencies other than rmq - if: steps.check_files.outputs.files_exists != 'true' && matrix.os == 'ubuntu-20.04' - run: | - pip install wheel - python bootstrap.py --all --force - - - name: Install volttron - run: | - source env/bin/activate - pip install -e . - # Run the specified tests and save the results to a unique file that can be archived for later analysis. - - name: Run pytest - run: | - source env/bin/activate - pip install -e . - pytest volttrontesting/platform/auth_tests -rf -o junit_family=xunit2 --junitxml=output/test-auth-${{matrix.os}}-${{ matrix.python-version }}-results.xml + - name: Run pytest on ${{ matrix.python-version }}, ${{ matrix.os }} + uses: volttron/volttron-build-action@v1 + with: + python_version: ${{ matrix.python-version }} + os: ${{ matrix.os }} + test_path: volttrontesting/platform/auth_tests + test_output_suffix: auth # Archive the results from the pytest to storage. - name: Archive test results diff --git a/.github/workflows/pytest-dbutils.yml b/.github/workflows/pytest-dbutils.yml new file mode 100644 index 0000000000..ad5b85d218 --- /dev/null +++ b/.github/workflows/pytest-dbutils.yml @@ -0,0 +1,53 @@ +--- +# This workflow is meant as a foundational workflow for running integration/unit tests on the +# platform. For this workflow we are testing the +# +# volttrontesting/testutils directory using pytest. +# +# This workflow also shows the caching mechanisms available for storage +# and retrieval of cache for quicker setup of test environments. + +name: Testing dbutils directory +on: [push, pull_request] + +jobs: + build: + # The strategy allows customization of the build and allows matrixing the version of os and software + # https://docs.github.com/en/free-pro-team@l.atest/actions/reference/workflow-syntax-for-github-actions#jobsjob_idstrategy + strategy: + fail-fast: false + matrix: + # Each entry in the os and python-version matrix will be run so for the 3 x 4 there will be 12 jobs run + os: [ ubuntu-18.04, ubuntu-20.04 ] + python-version: [ 3.6, 3.7] # , 3.8, 3.9 ] + + runs-on: ${{ matrix.os }} + env: + TEST_TYPE: dbutils + steps: + # checkout the volttron repository and set current directory to it + - uses: actions/checkout@v2 + + # Attempt to restore the cache from the build-dependency-cache workflow if present then + # the output value steps.check_files.outputs.files_exists will be set (see the next step for usage) + - name: Set up Python ${{matrix.os}} ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + # Run the specified tests and save the results to a unique file that can be archived for later analysis. + - name: Run pytest on ${{ matrix.python-version }}, ${{ matrix.os }} + uses: volttron/volttron-build-action@v1 + with: + python_version: ${{ matrix.python-version }} + os: ${{ matrix.os }} + test_path: volttrontesting/platform/${{ env.TEST_TYPE }} + test_output_suffix: ${{ env.TEST_TYPE }} + +# Archive the results from the pytest to storage. + - name: Archive test results + uses: actions/upload-artifact@v2 + if: always() + with: + name: pytest-report + path: output/${{ env.TEST_TYPE }}-${{matrix.os}}-${{ matrix.python-version }}-results.xml diff --git a/.github/workflows/pytest-testutils.yml b/.github/workflows/pytest-testutils.yml index 88472e1309..44fc0c300c 100644 --- a/.github/workflows/pytest-testutils.yml +++ b/.github/workflows/pytest-testutils.yml @@ -1,6 +1,6 @@ --- # This workflow is meant as a foundational workflow for running integration/unit tests on the -# plaform. For this workflow we are testing the +# platform. For this workflow we are testing the # # volttrontesting/testutils directory using pytest. # @@ -18,21 +18,15 @@ jobs: fail-fast: false matrix: # Each entry in the os and python-version matrix will be run so for the 3 x 4 there will be 12 jobs run - os: [ ubuntu-16.04, ubuntu-18.04, ubuntu-20.04 ] + os: [ ubuntu-18.04, ubuntu-20.04 ] python-version: [ 3.6, 3.7] # , 3.8, 3.9 ] runs-on: ${{ matrix.os }} steps: - # checkout the volttron repository and set current direectory to it + # checkout the volttron repository and set current directory to it - uses: actions/checkout@v2 - # Install erlang for rabbitmq - - name: Install erlang - if: matrix.os != 'ubuntu-20.04' - run: | - sudo scripts/rabbit_dependencies.sh debian ${{ matrix.os }} - # Attempt to restore the cache from the build-dependency-cache workflow if present then # the output value steps.check_files.outputs.files_exists will be set (see the next step for usage) - name: Set up Python ${{matrix.os}} ${{ matrix.python-version }} @@ -40,46 +34,23 @@ jobs: with: python-version: ${{ matrix.python-version }} - # Determine if the cache was restored or not. - - name: Has restored cache - id: check_files - uses: andstor/file-existence-action@v1 - with: - files: "env/bin/activate" - - # This step is only run if the cache wasn't able to be restored. - - name: Install dependencies including rmq - if: steps.check_files.outputs.files_exists != 'true' && matrix.os != 'ubuntu-20.04' - run: | - pip install wheel - python bootstrap.py --all --rabbitmq --force - - - name: Install dependencies other than rmq - if: steps.check_files.outputs.files_exists != 'true' && matrix.os == 'ubuntu-20.04' - run: | - pip install wheel - python bootstrap.py --all --force - - - name: Install volttron - run: | - source env/bin/activate - pip install -e . - # Run the specified tests and save the results to a unique file that can be archived for later analysis. - - name: Run pytest - run: | - source env/bin/activate - pip install -e . - pytest volttrontesting/testutils -rf -o junit_family=xunit2 --junitxml=output/test-testutils-${{matrix.os}}-${{ matrix.python-version }}-results.xml + - name: Run pytest on ${{ matrix.python-version }}, ${{ matrix.os }} + uses: volttron/volttron-build-action@v1 + with: + python_version: ${{ matrix.python-version }} + os: ${{ matrix.os }} + test_path: volttrontesting/testutils + test_output_suffix: testutils - # Archive the results from the pytest to storage. +# Archive the results from the pytest to storage. - name: Archive test results uses: actions/upload-artifact@v2 if: always() with: name: pytest-report path: output/test-testutils-${{matrix.os}}-${{ matrix.python-version }}-results.xml - + # - name: Publish Unit Test Results # uses: EnricoMi/publish-unit-test-result-action@v1.5 # if: always() diff --git a/README.md b/README.md index 247f467f9d..c2ffe452e8 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ ![image](docs/source/files/VOLLTRON_Logo_Black_Horizontal_with_Tagline.png) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/fcf58045b4804edf8f4d3ecde3016f76)](https://app.codacy.com/gh/VOLTTRON/volttron?utm_source=github.com&utm_medium=referral&utm_content=VOLTTRON/volttron&utm_campaign=Badge_Grade_Settings) +![example workflow](https://github.com/volttron/volttron/actions/workflows/pytest-testutils.yml/badge.svg) + VOLTTRON™ is an open source platform for distributed sensing and control. The platform provides services for collecting and storing data from buildings and devices and provides an environment for developing applications which interact with that data. -[![Build Status](https://travis-ci.org/VOLTTRON/volttron.svg?branch=develop)](https://travis-ci.org/VOLTTRON/volttron) ## Features @@ -150,7 +151,7 @@ You can deactivate the environment at any time by running `deactivate`. ##### 5. Create RabbitMQ setup for VOLTTRON: ```sh -vcfg --rabbitmq single [optional path to rabbitmq_config.yml] +vcfg rabbitmq single [--config optional path to rabbitmq_config.yml] ``` Refer to [examples/configurations/rabbitmq/rabbitmq_config.yml](examples/configurations/rabbitmq/rabbitmq_config.yml) @@ -180,9 +181,9 @@ be configured. The VOLTTRON instance name will be read from volttron_home/config if available, if not the user will be prompted for VOLTTRON instance name. To run the scripts without any prompts, save the VOLTTRON instance name in volttron_home/config file and pass the VOLTTRON home directory as a command line -argument. For example: `vcfg --vhome /home/vdev/.new_vhome --rabbitmq single` +argument. For example: `vcfg --vhome /home/vdev/.new_vhome rabbitmq single` -The Following are the example inputs for `vcfg --rabbitmq single` command. Since no +The Following are the example inputs for `vcfg rabbitmq single` command. Since no config file is passed the script prompts for necessary details. ```sh diff --git a/docs/source/deploying-volttron/multi-platform/files/cmd_line.png b/docs/source/deploying-volttron/multi-platform/files/cmd_line.png index 03945f113f..f9bfcbbecb 100644 Binary files a/docs/source/deploying-volttron/multi-platform/files/cmd_line.png and b/docs/source/deploying-volttron/multi-platform/files/cmd_line.png differ diff --git a/docs/source/deploying-volttron/multi-platform/multi-platform-rabbitmq-deployment.rst b/docs/source/deploying-volttron/multi-platform/multi-platform-rabbitmq-deployment.rst index 79544690eb..fe2943c6b4 100644 --- a/docs/source/deploying-volttron/multi-platform/multi-platform-rabbitmq-deployment.rst +++ b/docs/source/deploying-volttron/multi-platform/multi-platform-rabbitmq-deployment.rst @@ -4,7 +4,8 @@ Multi-platform RabbitMQ Deployment ================================== -With ZeroMQ based VOLTTRON, multi-platform communication was accomplished in three different ways: +With ZeroMQ based VOLTTRON, multi-platform communication was accomplished in three different ways described below. +Similar behavior can be accomplished with RabbitMQ-VOLTTRON as well. #. Direct connection to remote instance - Write an agent that would connect to a remote instance directly. @@ -26,10 +27,31 @@ The upstream server is the node that is publishing some message of interest; we The downstream server is the node that will receive messages from the upstream server; we shall refer to this node as the subscriber node. Note that upstream server & publisher node and downstream server & subscriber node will be used interchangeably for the rest of this guide. +Multi-Platform Communication With RabbitMQ SSL +---------------------------------------------- +RabbitMQ-VOLTTRON uses SSL based authentication for connection to the platform. This feature is extended to connection +between multiple VOLTTRON platforms. The below figure shows the 2 remote VOLTTRON platforms can establish authentication +connection to the other. + +.. image:: files/multiplatform_ssl.png + +Suppose there are two virtual machines (VOLTTRON1 and VOLTTRON2) running single instances of RabbitMQ; VOLTTRON1 and VOLTTRON2 +want to talk to each other via the federation or shovel plugins. For shovel/federation to have authenticated connection to the +remote instance, it needs to have it's public certificate signed by the remote instance's CA. So as part of the shovel +or federation creation steps, a certificate signing request is made to the remote instance. The admin of the remote instance +should be ready to accept/reject such a request through VOLTTRON's admin web interface. To facilitate this process, the +VOLTTRON platform exposes a web-based server API for requesting, listing, approving, and denying certificate requests. For +more detailed description, refer to :ref:`Agent communication to Remote RabbitMQ instance `. +After the CSR request is accepted, an authenticated shovel/federation connection can be established. + Using the Federation Plugin --------------------------- +.. note:: + Please make sure that a single instance of RabbitMQ VOLTTRON is setup before attempting to create a federation link + :ref:`platform installation steps for RMQ ` + Connecting multiple VOLTTRON instances can be done using the federation plugin. To create a RabbitMQ federation, we have to configure the downstream volttron instance to create federated exchange. A federated exchange links to other exchanges. In this case, the downstream federated exchange links to the upstream exchange. Conceptually, messages published to the @@ -65,19 +87,18 @@ To setup federation on the VOLTTRON instance, run the following command on the d .. code-block:: bash - vcfg --rabbitmq federation [optional path to rabbitmq_federation_config.yml] + vcfg rabbitmq federation [--config optional path to rabbitmq_federation_config.yml] [--max-retries optional maximum CSR retry attempt] -This establishes federation links to upstream servers. Once a federation link to the upstream server is established on -the downstream server, the messages published on the upstream server become available to the downstream server as if -it were published locally. +This establishes federation links to upstream servers. Here the default maximum retry attempt is set to 15. Once a federation link to the upstream server is established on +the downstream server, the messages published on the upstream server become available to the downstream server as if it were published locally. Multi-Platform RPC With Federation ---------------------------------- For multi-platform RPC communication, federation links need to be established on both the VOLTTRON -nodes. Once the federation links are established, RPC communication becomes fairly simple. +nodes. Once the federation links are established, RPC communication becomes fairly simple. .. image:: files/multiplatform_rpc.png @@ -133,22 +154,6 @@ VOLTTRON instance "volttron2" on host "host_B". First, a federation link needs .. _RabbitMQ-Multi-platform-SSL: -Multi-Platform Federation Communication With RabbitMQ SSL -========================================================= - -For multi-platform communication over federation, we need the connecting instances to trust each other. - -.. image:: files/multiplatform_ssl.png - -Suppose there are two virtual machines (VOLTTRON1 and VOLTTRON2) running single instances of RabbitMQ; VOLTTRON1 and VOLTTRON2 -want to talk to each other via the federation or shovel plugins. For shovel/federation to have authenticated connection to the -remote instance, it needs to have it's public certificate signed by the remote instance's CA. So as part of the shovel -or federation creation steps, a certificate signing request is made to the remote instance. The admin of the remote instance -should be ready to accept/reject such a request through VOLTTRON's admin web interface. To facilitate this process, the -VOLTTRON platform exposes a web-based server API for requesting, listing, approving, and denying certificate requests. For -more detailed description, refer to :ref:`Agent communication to Remote RabbitMQ instance `. -After the CSR request is accepted, an authenticated shovel/federation connection can be established. - Installation Steps ------------------ @@ -165,8 +170,8 @@ upstream servers on the downstream server and make the VOLTTRON exchange .. code-block:: bash - vcfg --rabbitmq federation [optional path to rabbitmq_federation_config.yml - containing the details of the upstream hostname, port and vhost.] + vcfg rabbitmq federation [--config optional path to rabbitmq_federation_config.yml + containing the details of the upstream hostname, port and vhost.] [--max-retries optional maximum CSR retry attempt] Example configuration for federation is available @@ -221,18 +226,6 @@ upstream servers on the downstream server and make the VOLTTRON exchange volttron2.volttron1.federation 172.20.0.2 APPROVED - c. Create a user in the upstream server (publisher) and provide it access to the virtual host of the upstream RabbitMQ server. - The username should take the form of .federation. - For example, if the downstream server name is "volttron1", and instance of local instance is "volttron2" then the instance name would be "volttron2.volttron1.federation". - Run the below command in the upstream server - - .. code-block:: bash - - vctl rabbitmq add-user - Do you want to set READ permission [Y/n] - Do you want to set WRITE permission [Y/n] - Do you want to set CONFIGURE permission [Y/n] - 5. Test the federation setup. a. On the downstream server run a listener agent which subscribes to messages from all platforms @@ -280,34 +273,35 @@ upstream servers on the downstream server and make the VOLTTRON exchange .. code-block:: bash - vctl rabbitmq list-federation-parameters - NAME URI - upstream-volttron2-volttron amqps://volttron2:5671/volttron?cacertfile=/home/volttron/vhome/test_fed/certificates/federation/volttron2_ca.crt&certfile=/home/volttron/vhome/test_fed/certificates/federation/volttron2.volttron1.federation.crt&keyfile=/home/volttron/vhome/test_fed/certificates/private/volttron1.federation.pem&verify=verify_peer&fail_if_no_peer_cert=true&auth_mechanism=external&server_name_indication=volttron2 + vctl rabbitmq list-federation-links + NAME STATUS + upstream-volttron2-volttron running Copy the upstream link name and run the below command to remove it. .. code-block:: bash - vctl rabbitmq remove-federation-parameters upstream-volttron2-volttron + vctl rabbitmq remove-federation-links upstream-volttron2-volttron + Do you wish to delete certificates as well? [Y/n] y + Removing certificate paths from VOLTTRON_HOME and from the config file .. note:: - These commands only remove the federation parameter from RabbitMQ and certificate entries from rabbitmq_federation_config.yml on the publisher node. - `It does not remove the actual certificates.` Rerunning the federation command for same setup will reuse the existing certificates. - If you need to rerun the federation command again for the same setup - and need to create fresh certificates, then you will need to manually remove public and private certificates. - Private certificates will be in - $VOLTTRON_HOME/certificates/private. Public certificates will be in two directories: - $VOLTTRON_HOME/certificates/federation and $VOLTTRON_HOME/certificates/certs. - Further, you should request the remote instance admin to delete earlier generated certificates through admin web - interface before a new CSR is sent for approval. + These commands removes the federation parameter from RabbitMQ, deletes the certificates from VOLTTRON_HOME and certificate entries from + rabbitmq_federation_config.yml on the publisher node. The remote admin must delete the remote certificates through admin web + interface. If you need to rerun the federation command again for the same setup, then a fresh CSR request is made to the remote instance. + The remote admin has to approve the new request as before. Using the Shovel Plugin ----------------------- +.. note:: + Please make sure that a single instance of RabbitMQ VOLTTRON is setup before attempting to create a shovel link + :ref:`platform installation steps for RMQ `. + Shovels act as well-written client applications which move messages from a source to a destination broker. The below configuration shows how to setup a shovel to forward PubSub messages or perform multi-platform RPC communication from a local (i.e. publisher node) to a remote instance (i.e. subscriber node). The configuration expects `hostname`, `port` and @@ -319,26 +313,28 @@ Path: `$VOLTTRON_HOME/rabbitmq_shovel_config.yml` # Mandatory parameters for shovel setup shovel: - rabbit-2: - port: '5671' - virtual-host: volttron - certificates: - csr: true - private_cert: "path to private key" # For example, /home/volttron/vhome/test_shovel/certificates/private/volttron1.shovelvolttron2.pem - public_cert: "path to public cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2.volttron1.shovelvolttron2.crt - remote_ca: "path to CA cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2_ca.crt - - # Configuration to forward pubsub topics - pubsub: - # Identity of agent that is publishing the topic - platform.driver: - - devices - # Configuration to make remote RPC calls - rpc: - # Remote instance name - volttron2: - # List of pair of agent identities (local caller, remote callee) - - [scheduler, platform.actuator] + volttron2: # remote hostname + https-port: 8443 + port: 5671 + shovel-user: volttron1.shovelvolttron2 #. + virtual-host: volttron + certificates: + private_cert: "path to private cert" # For example, /home/volttron/vhome/test_shovel/certificates/private/volttron1.shovelvolttron2.pem + public_cert: "path to public cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2.volttron1.shovelvolttron2.crt + remote_ca: "path to CA cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2_ca.crt + # Configuration to forward pubsub topics + pubsub: + # Identity of agent that is publishing the topic + platform.driver: + # Topic pattern to be forwarded + - devices + + # Configuration to make remote RPC calls + rpc: + # Remote instance name + volttron2: + # List of pair of agent identities (local caller, remote callee) + - [scheduler, platform.actuator] To forward PubSub messages, the topic and agent identity of the publisher agent is needed. To perform RPC, the instance name of the remote instance and agent identities of the local agent and remote agent are needed. @@ -347,7 +343,7 @@ To configure the VOLTTRON instance to setup shovel, run the following command on .. code-block:: bash - vcfg --rabbitmq shovel [optional path to rabbitmq_shovel_config.yml] + vcfg rabbitmq shovel [--config optional path to rabbitmq_shovel_config.yml] [--max-retries optional maximum CSR retry attempt] This sets up a shovel that forwards messages (either PubSub or RPC) from a local exchange to a remote exchange. @@ -424,7 +420,7 @@ creation process, a certificate signing request is made to the remote instance. accept or reject such a request through VOLTTRON admin web interface. If accepted, a bundle containing a certificate signed by the remote CA is sent as a response back to the local instance. Subsequently, shovel connection is established with these certificates. If the user already has certificates signed by the remote CA, then that will be used for -connection. Otherwise, the user can run the command ``vcfg --rabbitmq shovel`` and it will prompt the user to make a CSR request as part of shovel setup. +connection. Otherwise, the user can run the command ``vcfg rabbitmq shovel`` and it will prompt the user to make a CSR request as part of shovel setup. 1. Setup two VOLTTRON instances using the steps described in installation section. Please note that each instance should have a unique instance name. @@ -438,7 +434,7 @@ Please note that each instance should have a unique instance name. .. code-block:: bash - vcfg --rabbitmq shovel [optional path to rabbitmq_shovel_config.yml] + vcfg rabbitmq shovel [--config optional path to rabbitmq_shovel_config.yml] [--max-retries optional maximum CSR retry attempt] rabbitmq_shovel_config.yml should contain the details of the remote hostname, port, vhost, certificates for connecting to remote instance and list of topics to forward. @@ -500,15 +496,6 @@ Please note that each instance should have a unique instance name. .. image:: files/csr_accepted.png - c. Create a user in the subscriber node with username set to the publisher instance's - agent name (for example: volttron1-admin) and allow the shovel access to - the virtual host of the subscriber node. - - .. code-block:: bash - - cd $RABBITMQ_HOME - vctl rabbitmq add-user - 4. Test the shovel setup. a. Start VOLTTRON on publisher and subscriber nodes. @@ -547,150 +534,24 @@ Please note that each instance should have a unique instance name. .. code-block:: bash - vctl rabbitmq list-shovel-parameters - NAME SOURCE ADDRESS DESTINATION ADDRESS BINDING KEY - shovel-volttron2-devices amqps://volttron1:5671/volttron?cacertfile=/home/volttron/vhome/test_shovel/certificates/certs/volttron1-trusted-cas.crt&certfile=/home/volttron/vhome/test_shovel/certificates/certs/volttron1.platform.driver.crt&keyfile=/home/volttron/vhome/test_shovel/certificates/private/volttron1.platform.driver.pem&verify=verify_peer&fail_if_no_peer_cert=true&auth_mechanism=external&server_name_indication=volttron1 amqps://volttron2:5671/volttron?cacertfile=/home/volttron/vhome/test_shovel/certificates/shovels/volttron2_ca.crt&certfile=/home/volttron/vhome/test_shovel/certificates/shovels/volttron2.volttron1.shovelvolttron2.crt&keyfile=/home/volttron/vhome/test_shovel/certificates/private/volttron1.shovelvolttron2.pem&verify=verify_peer&fail_if_no_peer_cert=true&auth_mechanism=external&server_name_indication=volttron2 __pubsub__.volttron1.devices.# + vctl rabbitmq list-shovel-links + NAME STATUS SRC_URI DEST_URI SRC_EXCHANGE_KEY + + shovel-volttron2-devices running amqps://volttron1:5671/volttron amqps://volttron2:5671/volttron __pubsub__.volttron1.devices.# Copy the shovel name and run following command to remove it. .. code-block:: bash - vctl rabbitmq remove-shovel-parameters shovel-volttron2-devices - + vctl rabbitmq remove-shovel-links shovel-volttron2-devices + Do you wish to delete certificates as well? [Y/n] y + Removing certificate paths from VOLTTRON_HOME and from the config file + .. note:: - These commands only remove the shovel parameter from RabbitMQ and certificate entries from rabbitmq_shovel_config.yml on the publisher node. - `It does not remove the actual certificates.` Rerunning the shovel command for same setup will reuse the existing certificates. - But if you need to rerun the shovel command again for the same setup and need to create fresh certificates, then you will - need to manually remove public and private certificates. Private certificates will be in - $VOLTTRON_HOME/certificates/private. Public certificates will be in two directories: - $VOLTTRON_HOME/certificates/shovel and $VOLTTRON_HOME/certificates/certs. - Further, you should request the remote instance admin to delete earlier generated cert through the admin web - interface before a new CSR is sent for approval. - - -DataMover Communication ------------------------ - -The DataMover historian running on one instance makes RPC call to platform historian running on remote -instance to store data on remote instance. Platform historian agent returns response back to DataMover -agent. For such a request-response behavior, shovels need to be created on both instances. - -1. Please ensure that preliminary steps for multi-platform communication are completed (namely, - steps 1-3 described above) . - -2. To setup a data mover to send messages from local instance (say v1) to remote instance (say v2) - and back, we would need to setup shovels on both instances. - - Example of RabbitMQ shovel configuration on v1 - - .. code-block:: json - - shovel: - # hostname of remote machine - rabbit-2: - port: 5671 - certificates: - csr: true - private_cert: "path to private key" # For example, /home/volttron/vhome/test_shovel/certificates/private/volttron1.shovelvolttron2.pem - public_cert: "path to public cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2.volttron1.shovelvolttron2.crt - remote_ca: "path to CA cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2_ca.crt - rpc: - # Remote instance name - v2: - # List of pair of agent identities (local caller, remote callee) - - [data.mover, platform.historian] - virtual-host: v1 - - This says that DataMover agent on v1 wants to make RPC call to platform historian on v2. - - .. code-block:: bash - - vcfg --rabbitmq shovel [optional path to rabbitmq_shovel_config.yml - - - Example of RabbitMQ shovel configuration on v2 - - .. code-block:: json - - shovel: - # hostname of remote machine - rabbit-1: - port: 5671 - rpc: - # Remote instance name - v1: - # List of pair of agent identities (local caller, remote callee) - - [platform.historian, data.mover] - virtual-host: v2 - - This says that Hplatform historian on v2 wants to make RPC call to DataMover agent on v1. - - a. On v1, run below command to setup a shovel from v1 to v2. - - .. code-block:: bash - - vcfg --rabbitmq shovel [optional path to rabbitmq_shovel_config.yml - - b. Create a user on v2 with username set to remote agent's username - ( for example, v1.data.mover i.e., .) and allow - the shovel access to the virtual host of v2. - - .. code-block:: bash - - cd $RABBITMQ_HOME - vctl rabbitmq add-user - - c. On v2, run below command to setup a shovel from v2 to v1 - - .. code-block:: bash - - vcfg --rabbitmq shovel [optional path to rabbitmq_shovel_config.yml - - d. Create a user on v1 with username set to remote agent's username - ( for example, v2.patform.historian i.e., .) and allow - the shovel access to the virtual host of the v1. - - .. code-block:: bash - - cd $RABBITMQ_HOME - vctl rabbitmq add-user - -3. Start Platform driver agent on v1 - - .. code-block:: bash - - ./stop-volttron - vcfg --agent platform_driver - ./start-volttron - vctl start --tag platform_driver - -4. Install DataMover agent on v1. Contents of the install script can look like below. - - .. code-block:: bash - - #!/bin/bash - export CONFIG=$(mktemp /tmp/abc-script.XXXXXX) - cat > $CONFIG <`__. +Publications +============ + +VOLTTRON white papers and technical reports can be found at https://volttron.org/publications + .. |VOLTTRON| unicode:: VOLTTRON U+2122 diff --git a/docs/source/introduction/platform-install.rst b/docs/source/introduction/platform-install.rst index 411b9ae58e..878d9c5a22 100644 --- a/docs/source/introduction/platform-install.rst +++ b/docs/source/introduction/platform-install.rst @@ -293,7 +293,7 @@ Step 5 - Configure RabbitMQ setup for VOLTTRON .. code-block:: bash - vcfg --rabbitmq single [optional path to rabbitmq_config.yml] + vcfg rabbitmq single [--config optional path to rabbitmq_config.yml] A sample configuration file can be found in the VOLTTRON repository in **examples/configurations/rabbitmq/rabbitmq_config.yml**. At a minimum you will need to provide the host name and a unique common-name (under certificate-data) in the configuration file. @@ -332,7 +332,7 @@ exchange to capture unrouteable messages. The default behavior generates a certificate which is valid for a period of 1 year. -The Following are the example inputs for `vcfg --rabbitmq single` command. Since no config file is passed the script +The Following are the example inputs for `vcfg rabbitmq single` command. Since no config file is passed the script prompts for necessary details. .. code-block:: console diff --git a/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-ssl-auth.rst b/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-ssl-auth.rst index 3d9b1b6d58..06b9694157 100644 --- a/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-ssl-auth.rst +++ b/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-ssl-auth.rst @@ -90,7 +90,7 @@ The parameters of interest for SSL based configuration are We can then configure the VOLTTRON instance to use SSL based authentication with the below command: - vcfg --rabbitmq single + vcfg rabbitmq single [--config optional path to rabbitmq_config.yml] When one creates a single instance of RabbitMQ, the following is created / re-created in the VOLTTRON_HOME/certificates directory: diff --git a/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-volttron.rst b/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-volttron.rst index c7370c488f..5e224c8137 100644 --- a/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-volttron.rst +++ b/docs/source/platform-features/message-bus/rabbitmq/rabbitmq-volttron.rst @@ -73,7 +73,7 @@ To configure the VOLTTRON instance to use RabbitMQ message bus, run the followin .. code-block:: bash - vcfg --rabbitmq single [optional path to rabbitmq_config.yml] + vcfg rabbitmq single [--config optional path to rabbitmq_config.yml] At the end of the setup process, a RabbitMQ broker is setup to use the configuration provided. A new topic exchange for the VOLTTRON instance is created within the configured virtual host. diff --git a/docs/source/volttron-topics/troubleshooting/troubleshooting-rmq.rst b/docs/source/volttron-topics/troubleshooting/troubleshooting-rmq.rst index a471678a93..a044384ed3 100644 --- a/docs/source/volttron-topics/troubleshooting/troubleshooting-rmq.rst +++ b/docs/source/volttron-topics/troubleshooting/troubleshooting-rmq.rst @@ -74,7 +74,7 @@ There are few things that are essential for SSL certificates to work right. a. Please use a unique common-name for CA certificate for each VOLTTRON instance. This is configured under `certificate-data` in the `rabbitmq_config.yml` or if no yml file is used while configuring a VOLTTRON single - instance (using ``vcfg --rabbitmq single``). Certificate generated for agent will automatically get agent's VIP + instance (using ``vcfg rabbitmq single``). Certificate generated for agent will automatically get agent's VIP identity as the certificate's common-name b. The host name in the SSL certificate should match hostname used to access the server. For example, if the fully diff --git a/examples/configurations/rabbitmq/rabbitmq_federation_config.yml b/examples/configurations/rabbitmq/rabbitmq_federation_config.yml index 15233acd64..311a24d068 100644 --- a/examples/configurations/rabbitmq/rabbitmq_federation_config.yml +++ b/examples/configurations/rabbitmq/rabbitmq_federation_config.yml @@ -2,18 +2,18 @@ federation-upstream: volttron4: # hostname of upstream server port: '5671' + https-port: '8443' virtual-host: volttron4 certificates: - csr: true private_key: "path to private cert" # For example, /home/volttron/vhome/test_fed/certificates/private/volttron1.federation.pem public_cert: "path to public cert" # For example, /home/volttron/vhome/test_fed/certificates/federation/volttron2.volttron1.federation.crt remote_ca: "path to CA cert" # For example, /home/volttron/vhome/test_fed/certificates/federation/volttron2_ca.crt federation-user: volttron4.federation #.federation volttron5: # hostname of upstream server port: '5671' + https-port: '8443' virtual-host: volttron5 certificates: - csr: true private_key: "path to private cert" public_cert: "path to public cert" remote_ca: "path to CA cert" diff --git a/examples/configurations/rabbitmq/rabbitmq_shovel_config.yml b/examples/configurations/rabbitmq/rabbitmq_shovel_config.yml index 552b6a58a8..4cf61a4284 100644 --- a/examples/configurations/rabbitmq/rabbitmq_shovel_config.yml +++ b/examples/configurations/rabbitmq/rabbitmq_shovel_config.yml @@ -1,22 +1,24 @@ # Mandatory parameters for shovel setup shovel: - rabbit-2: - port: '5671' - virtual-host: volttron - certificates: - csr: true - private_cert: "path to private cert" # For example, /home/volttron/vhome/test_shovel/certificates/private/volttron1.shovelvolttron2.pem - public_cert: "path to public cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2.volttron1.shovelvolttron2.crt - remote_ca: "path to CA cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2_ca.crt - # Configuration to forward pubsub topics - pubsub: - # Identity of agent that is publishing the topic - platform.driver: - # Topic pattern to be forwarded - - devices - # Configuration to make remote RPC calls - rpc: - # Remote instance name - volttron2: - # List of pair of agent identities (local caller, remote callee) - - [scheduler, platform.actuator] + volttron2: # remote hostname + https-port: 8443 + port: 5671 + shovel-user: volttron1.shovelvolttron2 #. + virtual-host: volttron + certificates: + private_cert: "path to private cert" # For example, /home/volttron/vhome/test_shovel/certificates/private/volttron1.shovelvolttron2.pem + public_cert: "path to public cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2.volttron1.shovelvolttron2.crt + remote_ca: "path to CA cert" # For example, /home/volttron/vhome/test_shovel/certificates/shovels/volttron2_ca.crt + # Configuration to forward pubsub topics + pubsub: + # Identity of agent that is publishing the topic + platform.driver: + # Topic pattern to be forwarded + - devices + + # Configuration to make remote RPC calls + rpc: + # Remote instance name + volttron2: + # List of pair of agent identities (local caller, remote callee) + - [scheduler, platform.actuator] diff --git a/pytest.ini b/pytest.ini index c9d3405fe9..f3b8c2d4ba 100644 --- a/pytest.ini +++ b/pytest.ini @@ -63,4 +63,6 @@ markers = sqlitefuncts: level one integration tests for sqlitefuncts unit: Run all unit/level one integration tests influxdbutils: level one integration tests for influxdb + federation: Tests for rabbitmq federation communication + shovel: Tests for rabbitmq shovel communication contrib: tests for community-contributed agents diff --git a/services/ops/MessageDebuggerAgent/IDENTITY b/services/contrib/MessageDebuggerAgent/IDENTITY similarity index 100% rename from services/ops/MessageDebuggerAgent/IDENTITY rename to services/contrib/MessageDebuggerAgent/IDENTITY diff --git a/services/ops/MessageDebuggerAgent/README.md b/services/contrib/MessageDebuggerAgent/README.md similarity index 100% rename from services/ops/MessageDebuggerAgent/README.md rename to services/contrib/MessageDebuggerAgent/README.md diff --git a/services/ops/MessageDebuggerAgent/conftest.py b/services/contrib/MessageDebuggerAgent/conftest.py similarity index 100% rename from services/ops/MessageDebuggerAgent/conftest.py rename to services/contrib/MessageDebuggerAgent/conftest.py diff --git a/services/ops/MessageDebuggerAgent/messagedebugger.config b/services/contrib/MessageDebuggerAgent/messagedebugger.config similarity index 100% rename from services/ops/MessageDebuggerAgent/messagedebugger.config rename to services/contrib/MessageDebuggerAgent/messagedebugger.config diff --git a/services/ops/MessageDebuggerAgent/messagedebugger/__init__.py b/services/contrib/MessageDebuggerAgent/messagedebugger/__init__.py similarity index 100% rename from services/ops/MessageDebuggerAgent/messagedebugger/__init__.py rename to services/contrib/MessageDebuggerAgent/messagedebugger/__init__.py diff --git a/services/ops/MessageDebuggerAgent/messagedebugger/agent.py b/services/contrib/MessageDebuggerAgent/messagedebugger/agent.py similarity index 100% rename from services/ops/MessageDebuggerAgent/messagedebugger/agent.py rename to services/contrib/MessageDebuggerAgent/messagedebugger/agent.py diff --git a/services/ops/MessageDebuggerAgent/messageviewer/__init__.py b/services/contrib/MessageDebuggerAgent/messageviewer/__init__.py similarity index 100% rename from services/ops/MessageDebuggerAgent/messageviewer/__init__.py rename to services/contrib/MessageDebuggerAgent/messageviewer/__init__.py diff --git a/services/ops/MessageDebuggerAgent/messageviewer/viewer.py b/services/contrib/MessageDebuggerAgent/messageviewer/viewer.py similarity index 100% rename from services/ops/MessageDebuggerAgent/messageviewer/viewer.py rename to services/contrib/MessageDebuggerAgent/messageviewer/viewer.py diff --git a/services/ops/MessageDebuggerAgent/requirements.txt b/services/contrib/MessageDebuggerAgent/requirements.txt similarity index 100% rename from services/ops/MessageDebuggerAgent/requirements.txt rename to services/contrib/MessageDebuggerAgent/requirements.txt diff --git a/services/ops/MessageDebuggerAgent/setup.py b/services/contrib/MessageDebuggerAgent/setup.py similarity index 100% rename from services/ops/MessageDebuggerAgent/setup.py rename to services/contrib/MessageDebuggerAgent/setup.py diff --git a/services/ops/MessageDebuggerAgent/tests/sqlalchemy_dry_run.py b/services/contrib/MessageDebuggerAgent/tests/sqlalchemy_dry_run.py similarity index 100% rename from services/ops/MessageDebuggerAgent/tests/sqlalchemy_dry_run.py rename to services/contrib/MessageDebuggerAgent/tests/sqlalchemy_dry_run.py diff --git a/services/ops/MessageDebuggerAgent/tests/test_message_debugging.py b/services/contrib/MessageDebuggerAgent/tests/test_message_debugging.py similarity index 99% rename from services/ops/MessageDebuggerAgent/tests/test_message_debugging.py rename to services/contrib/MessageDebuggerAgent/tests/test_message_debugging.py index 6785acb561..9d2dd3879f 100644 --- a/services/ops/MessageDebuggerAgent/tests/test_message_debugging.py +++ b/services/contrib/MessageDebuggerAgent/tests/test_message_debugging.py @@ -82,7 +82,7 @@ class TestMessageDebugger: Regression tests for the MessageDebuggerAgent. """ - # @pytest.mark.skip(reason="Dependency on SQLAlchemy library") + @pytest.mark.skip(reason="Community contributed agent") def test_rpc_calls(self, agent): """Test the full range of RPC calls to the MessageDebuggerAgent, except those related to streaming.""" diff --git a/services/ops/FileWatchPublisher/Tests/test_file_watcher.py b/services/ops/FileWatchPublisher/Tests/test_file_watcher.py index 8e5493c5f3..4250402487 100644 --- a/services/ops/FileWatchPublisher/Tests/test_file_watcher.py +++ b/services/ops/FileWatchPublisher/Tests/test_file_watcher.py @@ -46,26 +46,12 @@ from volttron.platform import get_ops, get_home from volttron.platform.vip.agent import Agent -test_path = os.path.join(get_home(), "test.txt") - -test_config = { - "files": [ - { - "file": test_path, - "topic": "platform/test_topic" - } - ] -} - @pytest.fixture(scope="module") def publish_agent(request, volttron_instance): # 1: Start a fake agent to publish to message bus agent = volttron_instance.build_agent(identity='test-agent') - with open(test_path, "w") as textfile: - textfile.write("test_data") - agent.callback = MagicMock(name="callback") agent.callback.reset_mock() @@ -75,7 +61,6 @@ def stop_agent(): print("In teardown method of publish_agent") if isinstance(agent, Agent): agent.core.stop() - os.remove(test_path) request.addfinalizer(stop_agent) return agent @@ -98,18 +83,33 @@ def test_default_config(volttron_instance, publish_agent): volttron_instance.remove_agent(watcher_uuid) -# def test_file_watcher(volttron_instance, publish_agent): -# watcher_uuid = volttron_instance.install_agent( -# agent_dir=get_ops("FileWatchPublisher"), -# config_file=test_config, -# start=True, -# vip_identity="health_test") -# -# with open(test_path, "w+") as textfile: -# textfile.write("more test_data") -# -# gevent.sleep(1) -# -# assert publish_agent.callback.call_count == 1 -# print(publish_agent.callback.call_args) -# volttron_instance.remove_agent(watcher_uuid) +def test_file_watcher(volttron_instance, publish_agent): + test_path = os.path.join(get_home(), "test.txt") + + with open(test_path, "w") as textfile: + textfile.write("test_data") + + test_config = { + "files": [ + { + "file": test_path, + "topic": "platform/test_topic" + } + ] + } + + watcher_uuid = volttron_instance.install_agent( + agent_dir=get_ops("FileWatchPublisher"), + config_file=test_config, + start=True, + vip_identity="health_test") + + with open(test_path, "w+") as textfile: + textfile.write("more test_data") + + gevent.sleep(1) + + assert publish_agent.callback.call_count == 1 + print(publish_agent.callback.call_args) + volttron_instance.remove_agent(watcher_uuid) + os.remove(test_path) diff --git a/services/ops/LogStatisticsAgent/Tests/test_log_statistics.py b/services/ops/LogStatisticsAgent/Tests/test_log_statistics.py index 8aeeeeee65..284cbd58c2 100644 --- a/services/ops/LogStatisticsAgent/Tests/test_log_statistics.py +++ b/services/ops/LogStatisticsAgent/Tests/test_log_statistics.py @@ -45,25 +45,29 @@ from volttron.platform.vip.agent import Agent from volttron.platform import get_ops, get_home -# TODO fix finding test logs test_config = { - "file_path": os.path.join(get_home(), "volttron.log"), "analysis_interval_sec": 2, "publish_topic": "platform/log_statistics", "historian_topic": "analysis/log_statistics" } - @pytest.fixture(scope="module") def publish_agent(request, volttron_instance): + test_config = { + "file_path": os.path.join(volttron_instance.volttron_home, "volttron.log"), + "analysis_interval_sec": 2, + "publish_topic": "platform/log_statistics", + "historian_topic": "analysis/log_statistics" + } # 1: Start a fake agent to publish to message bus agent = volttron_instance.build_agent() - publish_agent.callback = MagicMock(name="callback") - publish_agent.callback.reset_mock() + agent.callback = MagicMock(name="callback") + agent.callback.reset_mock() - agent.vip.pubsub.subscribe(peer='pubsub', prefix=test_config.get("publish_topic"), - callback=publish_agent.callback).get() + agent.vip.pubsub.subscribe(peer='pubsub', + prefix=test_config.get("publish_topic"), + callback=agent.callback).get() def stop_agent(): print("In teardown method of publish_agent") @@ -91,20 +95,24 @@ def test_default_config(volttron_instance, publish_agent): volttron_instance.remove_agent(stats_uuid) -# def test_log_stats(volttron_instance, publish_agent): -# stats_uuid = volttron_instance.install_agent( -# agent_dir=get_ops("LogStatisticsAgent"), -# config_file=test_config, -# start=True, -# vip_identity="health_test") -# -# gevent.sleep(1) -# -# # building another agent should populate the logs -# volttron_instance.build_agent(identity="log_populate") -# -# gevent.sleep(2) -# -# # TODO do mock asserts -# -# volttron_instance.remove_agent(stats_uuid) +def test_log_stats(volttron_instance, publish_agent): + test_config["file_path"] = volttron_instance.log_path + print(f"File path: {test_config['file_path']}") + + stats_uuid = volttron_instance.install_agent( + agent_dir=get_ops("LogStatisticsAgent"), + config_file=test_config, + start=True, + vip_identity="health_test") + + import gevent + gevent.sleep(1) + + # building another agent should populate the logs + volttron_instance.build_agent(identity="log_populate") + + gevent.sleep(1) + + assert publish_agent.callback.call_count >= 1 + + volttron_instance.remove_agent(stats_uuid) diff --git a/services/ops/SysMonAgent/tests/test_sysmonagent.py b/services/ops/SysMonAgent/tests/test_sysmonagent.py index be405c835e..8cc07cf5f0 100644 --- a/services/ops/SysMonAgent/tests/test_sysmonagent.py +++ b/services/ops/SysMonAgent/tests/test_sysmonagent.py @@ -102,8 +102,8 @@ def add_topic(peer, sender, bus, topic, headers, messages): agent.vip.pubsub.subscribe('pubsub', base_topic, callback=add_topic) - max_wait = 1 + max(value for key, value in _test_config.items() if key.endswith('_interval')) - + max_wait = 1 + max(value for key, value in _test_config.items() if key.endswith('_interval')) + 8 + print(f"Max wait: {max_wait}, topics: {topics}, seen_topics: {seen_topics}") assert poll_gevent_sleep(max_wait, lambda: set(topics) <= seen_topics) @@ -124,6 +124,7 @@ def test_reconfigure_then_listen(sysmon_tester_agent): listen(sysmon_tester_agent, new_config) +@pytest.mark.dev def test_default_config(sysmon_tester_agent): """ Test that the topic can be reconfigured diff --git a/services/ops/TopicWatcher/tests/test_remote_topic_watcher.py b/services/ops/TopicWatcher/tests/test_remote_topic_watcher.py index 84c0704b99..1dcb30e142 100644 --- a/services/ops/TopicWatcher/tests/test_remote_topic_watcher.py +++ b/services/ops/TopicWatcher/tests/test_remote_topic_watcher.py @@ -182,4 +182,5 @@ def onmessage(peer, sender, bus, topic, headers, message): u"Topic(s) not published within time limit: [('fakedevice2/all', 'point'), " \ u"'fakedevice2/all', 'fakedevice']" in \ alert_messages + alert_messages.clear() diff --git a/volttron/platform/__init__.py b/volttron/platform/__init__.py index 38097b710c..2ade1daebc 100644 --- a/volttron/platform/__init__.py +++ b/volttron/platform/__init__.py @@ -213,6 +213,17 @@ def is_rabbitmq_available(): return rabbitmq_available +def is_web_available(): + web_available = True + try: + import jwt + from jinja2 import Environment, FileSystemLoader, select_autoescape + from ws4py.server.geventserver import WSGIServer + except ImportError: + web_available = False + return web_available + + __config__ = None diff --git a/volttron/platform/control.py b/volttron/platform/control.py index 5b389c0c65..95f650a76e 100644 --- a/volttron/platform/control.py +++ b/volttron/platform/control.py @@ -296,6 +296,7 @@ def stop_platform(self): @RPC.export def list_agents(self): + _log.info("CONTROL RPC list_agents") tag = self._aip.agent_tag priority = self._aip.agent_priority return [{'name': name, 'uuid': uuid, @@ -2381,6 +2382,64 @@ def list_shovel_parameters(opts): _stdout.write("Error in getting shovel parameters") +def list_fed_links(opts): + links = None + try: + links = rmq_mgmt.get_federation_links() + except requests.exceptions.HTTPError as e: + _stdout.write("No Federation links Found \n") + return + except ConnectionError as e: + _stdout.write("Error making request to RabbitMQ Management interface.\n" + "Check Connection Parameters: {} \n".format(e)) + return + try: + if links: + name_width = max(5, max(len(lk['name']) for lk in links)) + status_width = max(3, max(len(lk['status']) for lk in links)) + fmt = '{:{}} {:{}}\n' + _stderr.write( + fmt.format('NAME', name_width, 'STATUS', status_width)) + for link in links: + _stdout.write(fmt.format(link['name'], name_width, + link['status'], status_width)) + except (AttributeError, KeyError) as ex: + _stdout.write("Error in federation links") + + +def list_shovel_links(opts): + links = None + try: + links = rmq_mgmt.get_shovel_links() + except requests.exceptions.HTTPError as e: + _stdout.write("No Shovel links Found \n") + return + except ConnectionError as e: + _stdout.write("Error making request to RabbitMQ Management interface.\n" + "Check Connection Parameters: {} \n".format(e)) + return + try: + if links: + name_width = max(5, max(len(lk['name']) for lk in links)) + status_width = max(3, max(len(lk['status']) for lk in links)) + src_exchange_key_width = max(3, max(len(lk['src_exchange_key']) for lk in links)) + src_uri_width = max(3, max(len(lk['src_uri']) for lk in links)) + dest_uri_width = max(3, max(len(lk['dest_uri']) for lk in links)) + fmt = '{:{}} {:{}} {:{}} {:{}} {:{}}\n' + _stderr.write( + fmt.format('NAME', name_width, 'STATUS', status_width, 'SRC_URI', + src_uri_width, 'DEST_URI', dest_uri_width, + 'SRC_EXCHANGE_KEY', src_exchange_key_width)) + for link in links: + _stdout.write(fmt.format(link['name'], name_width, + link['status'], status_width, + link['src_uri'], src_uri_width, + link['dest_uri'], dest_uri_width, + link['src_exchange_key'], src_exchange_key_width)) + except (AttributeError, KeyError) as ex: + _stdout.write(f"Error in shovel links as {ex}") + + def list_bindings(opts): bindings = None try: @@ -2483,7 +2542,8 @@ def remove_queues(opts): def remove_fed_parameters(opts): try: for param in opts.parameters: - rmq_mgmt.delete_multiplatform_parameter('federation-upstream', param) + delete_certs = _ask_yes_no(f'Do you wish to delete certificates as well for {param}?') + rmq_mgmt.delete_multiplatform_parameter('federation-upstream', param, delete_certs=delete_certs) except requests.exceptions.HTTPError as e: _stdout.write("No Federation Parameters Found {} \n".format(opts.parameters)) except ConnectionError as e: @@ -2494,7 +2554,8 @@ def remove_fed_parameters(opts): def remove_shovel_parameters(opts): try: for param in opts.parameters: - rmq_mgmt.delete_multiplatform_parameter('shovel', param) + delete_certs = _ask_yes_no('Do you wish to delete certificates as well?') + rmq_mgmt.delete_multiplatform_parameter('shovel', param, delete_certs=delete_certs) except requests.exceptions.HTTPError as e: _stdout.write("No Shovel Parameters Found {} \n".format(opts.parameters)) except ConnectionError as e: @@ -3123,6 +3184,14 @@ def add_parser(*args, **kwargs) -> argparse.ArgumentParser: subparser=rabbitmq_subparsers) rabbitmq_list_fed_parameters.set_defaults(func=list_fed_parameters) + rabbitmq_list_fed_links = add_parser('list-federation-links', help='list all federation links', + subparser=rabbitmq_subparsers) + rabbitmq_list_fed_links.set_defaults(func=list_fed_links) + + rabbitmq_list_shovel_links = add_parser('list-shovel-links', help='list all Shovel links', + subparser=rabbitmq_subparsers) + rabbitmq_list_shovel_links.set_defaults(func=list_shovel_links) + rabbitmq_list_shovel_parameters = add_parser('list-shovel-parameters', help='list all shovel parameters', subparser=rabbitmq_subparsers) rabbitmq_list_shovel_parameters.set_defaults(func=list_shovel_parameters) @@ -3152,13 +3221,13 @@ def add_parser(*args, **kwargs) -> argparse.ArgumentParser: rabbitmq_remove_queues.add_argument('queues', nargs='+', help='Queue') rabbitmq_remove_queues.set_defaults(func=remove_queues) - rabbitmq_remove_fed_parameters = add_parser('remove-federation-parameters', + rabbitmq_remove_fed_parameters = add_parser('remove-federation-links', help='Remove federation parameter', subparser=rabbitmq_subparsers) rabbitmq_remove_fed_parameters.add_argument('parameters', nargs='+', help='parameter name/s') rabbitmq_remove_fed_parameters.set_defaults(func=remove_fed_parameters) - rabbitmq_remove_shovel_parameters = add_parser('remove-shovel-parameters', + rabbitmq_remove_shovel_parameters = add_parser('remove-shovel-links', help='Remove shovel parameter', subparser=rabbitmq_subparsers) rabbitmq_remove_shovel_parameters.add_argument('parameters', nargs='+', help='parameter name/s') diff --git a/volttron/platform/instance_setup.py b/volttron/platform/instance_setup.py index 591bb0c2ed..4f24a684cb 100644 --- a/volttron/platform/instance_setup.py +++ b/volttron/platform/instance_setup.py @@ -941,29 +941,52 @@ def wizard(): print('the config file is at {}/config\n'.format(volttron_home)) -def process_rmq_inputs(args, instance_name=None): +def process_rmq_inputs(args_dict, instance_name=None): + #print(f"args_dict:{args_dict}, args") if not is_rabbitmq_available(): raise RuntimeError("Rabbitmq Dependencies not installed please run python bootstrap.py --rabbitmq") confirm_volttron_home() - if len(args) == 2: - vhome = get_home() - if args[0] == 'single': + vhome = get_home() + + if args_dict['config'] is not None: + if not os.path.exists(vhome): + os.makedirs(vhome, 0o755) + if args_dict['installation-type'] == 'single': vhome_config = os.path.join(vhome, 'rabbitmq_config.yml') - elif args[0] == 'federation': + if args_dict['config'] != vhome_config: + copy(args_dict['config'], vhome_config) + elif args_dict['installation-type'] == 'federation': vhome_config = os.path.join(vhome, 'rabbitmq_federation_config.yml') - elif args[0] == 'shovel': + if os.path.exists(vhome_config): + prompt = f"rabbitmq_federation_config.yml already exists in VOLTTRON_HOME: {vhome}.\n" \ + "Do you wish to use this config file? If no, rabbitmq_federation_config.yml \n" \ + "will be replaced with new config file" + prompt = prompt_response(prompt, + valid_answers=y_or_n, + default='N') + if prompt in n: + copy(args_dict['config'], vhome_config) + else: + r = copy(args_dict['config'], vhome_config) + elif args_dict['installation-type'] == 'shovel': vhome_config = os.path.join(vhome, 'rabbitmq_shovel_config.yml') + if os.path.exists(vhome_config): + prompt = f"rabbitmq_shovel_config.yml already exists in VOLTTRON_HOME: {vhome}.\n" \ + "Do you wish to use this config file? If no, rabbitmq_shovel_config.yml \n" \ + "will be replaced with new config file" + prompt = prompt_response(prompt, + valid_answers=y_or_n, + default='N') + if prompt in n: + copy(args_dict['config'], vhome_config) + else: + r = copy(args_dict['config'], vhome_config) else: - print("Invalid argument. \nUsage: vcf --rabbitmq single|federation|shovel " - "[optional path to rabbitmq config yml]") + print("Invalid installation type. Acceptable values single|federation|shovel") sys.exit(1) - if args[1] != vhome_config: - if not os.path.exists(vhome): - os.makedirs(vhome, 0o755) - copy(args[1], vhome_config) - setup_rabbitmq_volttron(args[0], verbose, instance_name=instance_name) + setup_rabbitmq_volttron(args_dict['installation-type'], verbose, instance_name=instance_name, max_retries=args_dict['max_retries']) else: - setup_rabbitmq_volttron(args[0], verbose, prompt=True, instance_name=instance_name) + setup_rabbitmq_volttron(args_dict['installation-type'], verbose, prompt=True, instance_name=instance_name, max_retries=args_dict['max_retries']) def main(): @@ -972,27 +995,35 @@ def main(): parser.add_argument('-v', '--verbose', action='store_true') parser.add_argument('--vhome', help="Path to volttron home") parser.add_argument('--instance-name', dest='instance_name', help="Name of this volttron instance") - + parser.set_defaults(is_rabbitmq=False) group = parser.add_mutually_exclusive_group() agent_list = '\n\t' + '\n\t'.join(sorted(available_agents.keys())) group.add_argument('--list-agents', action='store_true', dest='list_agents', help='list configurable agents{}'.format(agent_list)) + rabbitmq_parser = parser.add_subparsers(title='rabbitmq', + metavar='', + dest='parser_name') + single_parser = rabbitmq_parser.add_parser('rabbitmq', help='Configure rabbitmq for single instance, ' + 'federation, or shovel either based on ' + 'configuration file in yml format or providing ' + 'details when prompted. \nUsage: vcfg rabbitmq ' + 'single|federation|shovel --config --max-retries ]') + single_parser.add_argument('installation-type', default='single', help='Rabbitmq option for installation. Installation type can be single|federation|shovel') + single_parser.add_argument('--max-retries', help='Optional Max retry attempt', type=int, default=12) + single_parser.add_argument('--config', help='Optional path to rabbitmq config yml', type=str) + single_parser.set_defaults(is_rabbitmq=True) group.add_argument('--agent', nargs='+', help='configure listed agents') - group.add_argument('--rabbitmq', nargs='+', - help='Configure rabbitmq for single instance, ' - 'federation, or shovel either based on ' - 'configuration file in yml format or providing ' - 'details when prompted. \nUsage: vcfg --rabbitmq ' - 'single|federation|shovel [rabbitmq config ' - 'file]') + group.add_argument('--secure-agent-users', action='store_true', dest='secure_agent_users', help='Require that agents run with their own users (this requires running ' 'scripts/secure_user_permissions.sh as sudo)') args = parser.parse_args() + verbose = args.verbose # Protect against configuration of base logger when not the "main entry point" if verbose: @@ -1013,25 +1044,12 @@ def main(): _update_config_file(instance_name=args.instance_name) if args.list_agents: print("Agents available to configure:{}".format(agent_list)) - elif args.rabbitmq: - if len(args.rabbitmq) > 2: - print("vcfg --rabbitmq can at most accept 2 arguments") - parser.print_help() - sys.exit(1) - elif args.rabbitmq[0] not in ['single', 'federation', 'shovel']: - print("Usage: vcf --rabbitmq single|federation|shovel " - "[optional path to rabbitmq config yml]") - parser.print_help() - sys.exit(1) - elif len(args.rabbitmq) == 2 and not os.path.exists(args.rabbitmq[1]): - print("Invalid rabbitmq configuration file path.") - parser.print_help() - sys.exit(1) - else: - process_rmq_inputs(args.rabbitmq, args.instance_name) + elif args.secure_agent_users: config_opts['secure-agent-users'] = args.secure_agent_users _update_config_file() + elif args.is_rabbitmq: + process_rmq_inputs(vars(args)) elif not args.agent: wizard() diff --git a/volttron/platform/main.py b/volttron/platform/main.py index f81db6f07f..c22a164543 100644 --- a/volttron/platform/main.py +++ b/volttron/platform/main.py @@ -963,7 +963,6 @@ def rmq_router(stop): del event protected_topics = auth.get_protected_topics() - # Spawn Greenlet friendly ZMQ router # Necessary for backward compatibility with ZMQ message bus green_router = GreenRouter(opts.vip_local_address, opts.vip_address, @@ -1029,7 +1028,6 @@ def rmq_router(stop): enable_store=False, message_bus='zmq') ] - entry = AuthEntry(credentials=services[0].core.publickey, user_id=CONTROL, capabilities=[{'edit_config_store': {'identity': '/.*/'}}, @@ -1040,6 +1038,7 @@ def rmq_router(stop): # Begin the webserver based options here. if opts.bind_web_address is not None: if not HAS_WEB: + _log.info(f"Web libraries not installed, but bind web address specified\n") sys.stderr.write("Web libraries not installed, but bind web address specified\n") sys.stderr.write("Please install web libraries using python3 bootstrap.py --web\n") sys.exit(-1) @@ -1090,7 +1089,6 @@ def rmq_router(stop): # capabilities=['allow_auth_modifications'], # comments='Automatically added by platform on start') # AuthFile().add(entry, overwrite=True) - health_service = HealthService(address=address, identity=PLATFORM_HEALTH, heartbeat_autostart=True, enable_store=False, diff --git a/volttron/platform/vip/rmq_connection.py b/volttron/platform/vip/rmq_connection.py index 3724802a74..1f97d58139 100644 --- a/volttron/platform/vip/rmq_connection.py +++ b/volttron/platform/vip/rmq_connection.py @@ -360,7 +360,7 @@ def _send_via_rmq(self, destination_routing_key, subsystem, args, msg_id, user): # Fit VIP frames in the PIKA properties dict # VIP format - [SENDER, RECIPIENT, PROTO, USER_ID, MSG_ID, SUBSYS, ARGS...] dct = { - 'user_id': self._rmq_userid, + #'user_id': self._rmq_userid, 'app_id': self.routing_key, # Routing key of SENDER 'headers': dict( recipient=destination_routing_key, # RECEIVER diff --git a/volttron/platform/web/admin_endpoints.py b/volttron/platform/web/admin_endpoints.py index 7dc7884a9e..c386695863 100644 --- a/volttron/platform/web/admin_endpoints.py +++ b/volttron/platform/web/admin_endpoints.py @@ -42,6 +42,7 @@ from urllib.parse import parse_qs from volttron.platform.agent.known_identities import PLATFORM_WEB, AUTH +from volttron.platform.jsonrpc import RemoteError try: from jinja2 import Environment, FileSystemLoader, select_autoescape, TemplateNotFound @@ -160,6 +161,14 @@ def verify_and_dispatch(self, env, data): except NotAuthorized: _log.error("Unauthorized user attempted to connect to {}".format(env.get('PATH_INFO'))) return Response('

Unauthorized User

', status="401 Unauthorized") + except RemoteError as e: + if "ExpiredSignatureError" in e.exc_info["exc_type"]: + _log.warning("Access token has expired! Please re-login to renew.") + template = template_env(env).get_template('login.html') + _log.debug("Login.html: {}".format(env.get('PATH_INFO'))) + return Response(template.render(), content_type='text/html') + else: + _log.error(e) # Make sure we have only admins for viewing this. if 'admin' not in claims.get('groups'): diff --git a/volttron/platform/web/csr_endpoints.py b/volttron/platform/web/csr_endpoints.py index c0e5b7deb4..1165add805 100644 --- a/volttron/platform/web/csr_endpoints.py +++ b/volttron/platform/web/csr_endpoints.py @@ -48,7 +48,6 @@ def get_routes(self): ] def _csr_request_new(self, env, data): - _log.debug("New csr request") if not isinstance(data, dict): try: @@ -86,7 +85,10 @@ def _csr_request_new(self, env, data): else: try: cert = self._certs.approve_csr(identity) - permissions = self._core().rmq_mgmt.get_default_permissions(identity) + #permissions = self._core().rmq_mgmt.get_default_permissions(identity) + _log.debug(f"CREATING NEW RMQ USER: {identity}") + permissions = dict(configure=".*", read=".*", + write=".*") self._core().rmq_mgmt.create_user_with_permissions(identity, permissions, True) diff --git a/volttron/utils/rmq_config_params.py b/volttron/utils/rmq_config_params.py index aa639ee7d7..554eaa4023 100644 --- a/volttron/utils/rmq_config_params.py +++ b/volttron/utils/rmq_config_params.py @@ -58,9 +58,9 @@ def read_config_file(filename): with open(filename, 'r') as yaml_file: data = yaml.safe_load(yaml_file) except IOError as exc: - _log.error("Error reading from file: {}".format(filename)) + _log.error(f"Error reading from file: {filename}, Exception: {exc}") except yaml.YAMLError as exc: - _log.error("Yaml Error: {}".format(filename)) + _log.error(f"Yaml Error: {filename}. Exception: {exc}") return data @@ -69,9 +69,9 @@ def write_to_config_file(filename, data): with open(filename, 'w') as yaml_file: yaml.dump(data, yaml_file, default_flow_style=False) except IOError as exc: - _log.error("Error writing to file: {}".format(filename)) + _log.error(f"Error writing to file: {filename}. Exception: {exc}") except yaml.YAMLError as exc: - _log.error("Yaml Error: {}".format(filename)) + _log.error(f"Yaml Error: {filename}. Exception: {exc}") class RMQConfig(object): @@ -129,7 +129,7 @@ def load_rmq_config(self, volttron_home=None): :return: """ """Loads the config file if the path exists.""" - + _log.debug(f"RMQConfig: {self.volttron_rmq_config}") with open(self.volttron_rmq_config, 'r') as yaml_file: self.config_opts = yaml.safe_load(yaml_file) if self.config_opts.get('rmq-home'): diff --git a/volttron/utils/rmq_mgmt.py b/volttron/utils/rmq_mgmt.py index 9b82d07a34..657e0ed0bf 100644 --- a/volttron/utils/rmq_mgmt.py +++ b/volttron/utils/rmq_mgmt.py @@ -691,6 +691,65 @@ def get_bindings(self, exchange, ssl_auth=None): response = self._http_get_request(url, ssl_auth) return response + def get_federation_links(self, ssl_auth=None): + """ + List all federation links for a given virtual host + :param ssl: Flag for SSL connection + :return: list of federation links + """ + ssl_auth = ssl_auth if ssl_auth is not None else self.is_ssl + url = '/api/federation-links/{vhost}'.format( + vhost=self.rmq_config.virtual_host) + response = self._http_get_request(url, ssl_auth) + links = [] + if response: + for res in response: + lk = dict() + lk['name'] = res['upstream'] + lk['status'] = res.get('status', 'Error in link') + links.append(lk) + return links + + def get_shovel_link_status(self, name, ssl_auth=None): + state = 'error' + links = self.get_shovel_links(ssl_auth=ssl_auth) + for link in links: + if link['name'] == name: + state = link['status'] + break + return state + + def get_federation_link_status(self, name, ssl_auth=None): + state = 'error' + links = self.get_federation_links(ssl_auth=ssl_auth) + for link in links: + if link['name'] == name: + state = link['status'] + break + return state + + def get_shovel_links(self, ssl_auth=None): + """ + List all shovel links for a given virtual host + :param ssl: Flag for SSL connection + :return: list of federation links + """ + ssl_auth = ssl_auth if ssl_auth is not None else self.is_ssl + url = '/api/shovels/{vhost}'.format( + vhost=self.rmq_config.virtual_host) + response = self._http_get_request(url, ssl_auth) + links = [] + if response: + for res in response: + lk = dict() + lk['name'] = res['name'] + lk['status'] = res.get('state', 'Error in link') + lk['src_uri'] = res.get('src_uri', '') + lk['dest_uri'] = res.get('dest_uri', '') + lk['src_exchange_key'] = res.get('src_exchange_key', '') + links.append(lk) + return links + # We need http address and port def init_rabbitmq_setup(self): """ @@ -750,7 +809,7 @@ def is_valid_mgmt_port(port): return port == 15672 or port == 15671 - def delete_multiplatform_parameter(self, component, parameter_name, vhost=None): + def delete_multiplatform_parameter(self, component, parameter_name, vhost=None, delete_certs=False): """ Delete a component parameter :param component: component name @@ -758,6 +817,32 @@ def delete_multiplatform_parameter(self, component, parameter_name, vhost=None): :param vhost: virtual host :return: """ + shovel_names_for_host = [] + self.delete_parameter(component, parameter_name, vhost, + ssl_auth=self.rmq_config.is_ssl) + print(f"Deleted {component} parameter: {parameter_name}") + + try: + if component == 'shovel': + parameter_parts = parameter_name.split('-') + shovel_links = self.get_shovel_links() + shovel_names = [link['name'] for link in shovel_links] + for name in shovel_names: + name_parts = name.split('-') + if parameter_parts[1] == name_parts[1]: + shovel_names_for_host.append(name) + # Check if there are other shovel connections to remote platform. If yes, we + # cannot delete the certs since others will need them + if delete_certs and len(shovel_names_for_host) >= 1: + print(f"Cannot delete certificates since there are other shovels " + f"connected to remote host: {parameter_parts[1]}") + return + except AttributeError as ex: + _log.error(f"Unable to reach RabbitMQ management API. Check if RabbitMQ server is running. " + f"If not running, start the server using start-rabbitmq script in root of source directory.") + return + + import os vhome = get_home() if component == 'shovel': config_file = os.path.join(vhome, 'rabbitmq_shovel_config.yml') @@ -766,19 +851,39 @@ def delete_multiplatform_parameter(self, component, parameter_name, vhost=None): config_file = os.path.join(vhome, 'rabbitmq_federation_config.yml') key = 'federation-upstream' config = read_config_file(config_file) - print("Removing certificate paths from the shovel config file. Please remove remote certificates manually " - "from the VOLTTRON_HOME folder if needed") - names = parameter_name.split("-") + # Delete certs from VOLTTRON_HOME + if delete_certs: + print(f"Removing certificate paths from VOLTTRON_HOME and from the config file") + names = parameter_name.split("-") - try: - del config[key][names[1]]['certificates'] - write_to_config_file(config_file, config) - except (KeyError, IndexError) as e: - print(f"names:{e}") - pass - self.delete_parameter(component, parameter_name, vhost, - ssl_auth=self.rmq_config.is_ssl) + certs_config = None + try: + certs_config = config[key][names[1]]['certificates'] + del config[key][names[1]]['certificates'] + write_to_config_file(config_file, config) + except (KeyError, IndexError) as e: + print(f"Error: Did not find certificates entry in {config_file}:{e}") + return + try: + private_key = certs_config['private_key'] + public_cert = certs_config['public_cert'] + remote_ca = certs_config['remote_ca'] + if os.path.exists(private_key): + os.remove(private_key) + private_dir, filename = os.path.split(private_key) + cert_name = filename[:-4] + '.crt' + cert_path = private_dir.replace('private', 'certs')+'/' + cert_name + + if os.path.exists(cert_path): + os.remove(cert_path) + if os.path.exists(public_cert): + os.remove(public_cert) + if os.path.exists(remote_ca): + os.remove(remote_ca) + except KeyError as e: + print(f"Error: Missing key in {config_file}: {e}") + pass def build_connection_param(self, rmq_user, ssl_auth=None, retry_attempt=30, retry_delay=2): """ @@ -969,7 +1074,7 @@ def build_agent_connection(self, identity, instance_name): # vctl certs create-ssl-keypair should be used to create a cert/key pair # and then agents should be started. try: - self.rmq_config.crts.create_signed_cert_files(rmq_user, overwrite=False) + c, k = self.rmq_config.crts.create_signed_cert_files(rmq_user, overwrite=False) except Exception as e: _log.error("Exception creating certs. {}".format(e)) raise RuntimeError(e) @@ -988,6 +1093,13 @@ def build_agent_connection(self, identity, instance_name): return param + def create_signed_certs(self, rmq_user): + try: + c, k = self.rmq_config.crts.create_signed_cert_files(rmq_user, overwrite=False) + except Exception as e: + _log.error("Exception creating certs. {}".format(e)) + raise RuntimeError(e) + def build_remote_plugin_connection(self, rmq_user, host, port, vhost, is_ssl, certs_dict=None): """ Check if RabbitMQ user and certs exists for this agent, if not @@ -1066,9 +1178,9 @@ def get_ssl_url_params(self, user=None, certs_dict=None): cert_file = self.rmq_config.crts.cert_file(user) key_file = self.rmq_config.crts.private_key_file(user) else: - ca_file = certs_dict['ca_file'] - cert_file = certs_dict['cert_file'] - key_file = certs_dict['key_file'] + ca_file = certs_dict['remote_ca'] + cert_file = certs_dict['public_cert'] + key_file = certs_dict['private_key'] return "cacertfile={ca}&certfile={cert}&keyfile={key}" \ "&verify=verify_peer&fail_if_no_peer_cert=true" \ "&auth_mechanism=external".format(ca=ca_file, diff --git a/volttron/utils/rmq_setup.py b/volttron/utils/rmq_setup.py index 78e219d724..33e85d7fe0 100644 --- a/volttron/utils/rmq_setup.py +++ b/volttron/utils/rmq_setup.py @@ -52,17 +52,18 @@ import yaml import time -from . rmq_mgmt import RabbitMQMgmt -from . rmq_config_params import RMQConfig, read_config_file, write_to_config_file +from .rmq_mgmt import RabbitMQMgmt +from .rmq_config_params import RMQConfig, read_config_file, write_to_config_file from volttron.platform import certs from volttron.platform import get_home from volttron.platform.agent.utils import (store_message_bus_config, execute_command) -from volttron.utils.prompt import prompt_response, y, y_or_n -from volttron.platform.agent.utils import get_platform_instance_name +from volttron.utils.prompt import prompt_response, y, n, y_or_n from volttron.platform import jsonapi from urllib.parse import urlparse +from volttron.platform.web import DiscoveryInfo +from volttron.platform.agent.utils import get_platform_instance_name, get_fq_identity _log = logging.getLogger(os.path.basename(__file__)) @@ -168,7 +169,55 @@ def write_env_file(rmq_config, conf_file, env=None): env_conf.write(env_entries) -def _create_federation_setup(admin_user, admin_password, is_ssl, vhost, vhome): +def _get_federation_certs(vhome): + federation_config_file = os.path.join(vhome, + 'rabbitmq_federation_config.yml') + federation_config = read_config_file(federation_config_file) + federation = federation_config.get('federation-upstream', {}) + success = False + update_needed = False + + try: + for host, upstream in federation.items(): + rmq_user = 'federation' + if 'certificates' not in upstream: + # certificates key not found in shovel config + https_port = upstream.get('https-port', 8443) + remote_addr = 'https://{}:{}'.format(host, https_port) + print(f"Certificates not found. Requesting CSR from {remote_addr}") + _log.debug(f"Certificates not found. Requesting CSR from {remote_addr}") + # request CSR from remote host + try: + ca_file, cert_file, prvt_file = _request_csr(rmq_user, remote_addr, 'federation') + except Exception as ex: + _log.error(f"{ex}") + ca_file = None + cert_file = None + prvt_file = None + + if ca_file is not None and cert_file is not None and prvt_file is not None: + upstream['certificates'] = {} + # root CA + upstream['certificates']['remote_ca'] = ca_file + # public cert + upstream['certificates']['public_cert'] = cert_file + # private_key + upstream['certificates']['private_key'] = prvt_file + update_needed = True + else: + success = True + if update_needed: + federation_config['federation-upstream'] = federation + write_to_config_file(federation_config_file, federation_config) + success = True + + except KeyError as ex: + _log.error(f"Federation config has missing key: {ex}") + success = False + return success + + +def _create_federation_setup(is_ssl, vhost, vhome): """ Creates a RabbitMQ federation of multiple VOLTTRON instances based on rabbitmq config. @@ -183,62 +232,116 @@ def _create_federation_setup(admin_user, admin_password, is_ssl, vhost, vhome): federation_config_file = os.path.join(vhome, 'rabbitmq_federation_config.yml') federation_config = read_config_file(federation_config_file) - federation = federation_config.get('federation-upstream') - - if federation: - #ssl_params = None - #if is_ssl: - # ssl_params = rmq_mgmt.get_ssl_url_params() + federation = federation_config.get('federation-upstream', {}) + federation_names = [] + try: + federation_links = rmq_mgmt.get_federation_links() + federation_names = [link['name'] for link in federation_links] + except AttributeError as ex: + _log.error(f"Unable to reach RabbitMQ management API. Check if RabbitMQ server is running. " + f"If not running, start the server using start-rabbitmq script in root of source directory.") + return - for host, upstream in federation.items(): + for host, upstream in federation.items(): + try: + name = "upstream-{host}-{vhost}".format(vhost=upstream['virtual-host'], + host=host) + if name in federation_names: + _log.error(f"Federation link with name: {name} already exists. " + "Skipping this configuration and moving to next one") + continue + if 'certificates' not in upstream: + _log.error(f"Certificates key missing in config.") + continue + + rmq_user = upstream['federation-user'] try: - name = "upstream-{host}-{vhost}".format(vhost=upstream['virtual-host'], - host=host) - _log.debug("Upstream Server: {name} ".format(name=name)) - - certs_dict = None - rmq_user = None - if 'certificates' in upstream: - _log.debug("upstream parameters under destination: {}".format(upstream)) - is_csr = upstream['certificates'].get('csr', False) - if is_csr: - certs_dict = dict() - certs_dict['ca_file'] = upstream['certificates']['remote_ca'] - certs_dict['cert_file'] = upstream['certificates']['public_cert'] - certs_dict['key_file'] = upstream['certificates']['private_key'] - rmq_user = upstream['federation-user'] - else: - # certificates key not found in upstream config - _log.debug("ERROR: certificates key not found in federation config. Cannot make connection to remote server without remote certificates") - continue # Build destination address address = rmq_mgmt.build_remote_plugin_connection(rmq_user, host, upstream['port'], upstream['virtual-host'], is_ssl, - certs_dict=certs_dict) - prop = dict(vhost=vhost, - component="federation-upstream", - name=name, - value={"uri": address}) - rmq_mgmt.set_parameter('federation-upstream', - name, - prop, - vhost) - - policy_name = 'volttron-federation' - policy_value = {"pattern": "^volttron", - "definition": {"federation-upstream-set": "all"}, - "priority": 0, - "apply-to": "exchanges"} - rmq_mgmt.set_policy(policy_name, - policy_value, - vhost) - except KeyError as ex: - _log.error("Federation setup did not complete. " - "Missing Key {key} in upstream config " - "{upstream}".format(key=ex, upstream=upstream)) + certs_dict=federation[host]['certificates']) + except Exception as ex: + _log.error("Exception occured while trying to establish rabbitmq connection. " + "Check if rabbitmq is running.") + return + + prop = dict(vhost=vhost, + component="federation-upstream", + name=name, + value={"uri": address}) + rmq_mgmt.set_parameter('federation-upstream', + name, + prop, + vhost) + + policy_name = 'volttron-federation' + policy_value = {"pattern": "^volttron", + "definition": {"federation-upstream-set": "all"}, + "priority": 0, + "apply-to": "exchanges"} + rmq_mgmt.set_policy(policy_name, + policy_value, + vhost) + import gevent + gevent.sleep(5) + print(f"Setup for federation with name: {name} is completed." + f"Status is: {rmq_mgmt.get_federation_link_status(name)}") + + except KeyError as ex: + _log.error("Federation setup did not complete. " + "Missing Key {key} in upstream config " + "{upstream}".format(key=ex, upstream=upstream)) + + +def _get_certs_for_shovel(vhome): + shovel_config_file = os.path.join(vhome, + 'rabbitmq_shovel_config.yml') + shovel_config = read_config_file(shovel_config_file) + shovels = shovel_config.get('shovel', {}) + update_needed = False + success = False + try: + for remote_host, shovel in shovels.items(): + if 'certificates' not in shovel: + shovel_user = shovel['shovel-user'] + https_port = shovel.get('https-port', 8443) + + # certificates key not found in shovel config + remote_addr = 'https://{}:{}'.format(remote_host, https_port) + print(f"Certificates not found. Requesting CSR from {remote_addr}") + + # request CSR from remote host + try: + ca_file, cert_file, prvt_file = _request_csr(shovel_user, remote_addr, 'shovel') + except Exception as ex: + _log.error(f"{ex}") + ca_file = None + cert_file = None + prvt_file = None + + if ca_file is not None and cert_file is not None and prvt_file is not None: + shovel['certificates'] = {} + # root CA + shovel['certificates']['remote_ca'] = ca_file + # public cert + shovel['certificates']['public_cert'] = cert_file + # private_key + shovel['certificates']['private_key'] = prvt_file + update_needed = True + else: + success = True + if update_needed: + shovel_config['shovel'] = shovels + write_to_config_file(shovel_config_file, shovel_config) + success = True + except KeyError as exc: + _log.error("Shovel config has missing Key: {}".format(exc)) + success = False + + return success def _create_shovel_setup(instance_name, local_host, port, vhost, vhome, is_ssl): @@ -251,38 +354,48 @@ def _create_shovel_setup(instance_name, local_host, port, vhost, vhome, is_ssl): shovel_config = read_config_file(shovel_config_file) shovels = shovel_config.get('shovel', {}) - ssl_params = None rmq_mgmt = RabbitMQMgmt() - _log.debug("shovel config: {}".format(shovel_config)) + shovel_names = [] + try: + shovel_links = rmq_mgmt.get_shovel_links() + shovel_names = [link['name'] for link in shovel_links] + except AttributeError as ex: + _log.error(f"Unable to reach RabbitMQ management API. Check if RabbitMQ server is running. " + f"If not running, start the server using start-rabbitmq script in root of source directory.") + return + try: for remote_host, shovel in shovels.items(): pubsub_config = shovel.get("pubsub", {}) - _log.debug("shovel parameters: {}".format(shovel)) + for identity, topics in pubsub_config.items(): # Build source address rmq_user = instance_name + '.' + identity - src_uri = rmq_mgmt.build_remote_plugin_connection(rmq_user, - local_host, port, - vhost, is_ssl) - certs_dict = None - if 'certificates' in shovel: - _log.debug("shovel parameters under destination: {}".format(shovel)) - is_csr = shovel['certificates'].get('csr', False) - if is_csr: - certs_dict = dict() - certs_dict['ca_file'] = shovel['certificates']['remote_ca'] - certs_dict['cert_file'] = shovel['certificates']['public_cert'] - certs_dict['key_file'] = shovel['certificates']['private_key'] - rmq_user = shovel['shovel-user'] - else: - # destination key not found in shovel config - _log.debug("ERROR: certificates key not found in shovel config. Cannot make connection to remote server without remote certificates") + try: + src_uri = rmq_mgmt.build_remote_plugin_connection(rmq_user, + local_host, port, + vhost, is_ssl) + except Exception as ex: + _log.error("Exception occured while trying to establish rabbitmq connection. " + "Check if rabbitmq is running.") + return + + if 'certificates' not in shovel: + _log.error(f"Certificates not found.\nContinuing with other configurations") continue - # Build destination address - dest_uri = rmq_mgmt.build_remote_plugin_connection(rmq_user, - remote_host, shovel['port'], - shovel['virtual-host'], - is_ssl, certs_dict=certs_dict) + rmq_user = shovel['shovel-user'] + try: + # Build destination address + dest_uri = rmq_mgmt.build_remote_plugin_connection(rmq_user, + remote_host, + shovel['port'], + shovel['virtual-host'], + is_ssl, + certs_dict=shovels[remote_host]['certificates']) + except Exception as ex: + _log.error("Exception occured while trying to establish rabbitmq connection. " + "Check if rabbitmq is running.") + return if not isinstance(topics, list): topics = [topics] @@ -291,9 +404,12 @@ def _create_shovel_setup(instance_name, local_host, port, vhost, vhome, is_ssl): topic)) name = "shovel-{host}-{topic}".format(host=remote_host, topic=topic) - routing_key = "__pubsub__.{instance}.{topic}.#".format( - instance=instance_name, - topic=topic) + if name in shovel_names: + _log.error(f"Shovel with name: {name} already exists. " + "Skipping this configuration and moving to next one") + continue + routing_key = "__pubsub__.{instance}.{topic}.#".format(instance=instance_name, + topic=topic) prop = dict(vhost=vhost, component="shovel", name=name, @@ -304,41 +420,50 @@ def _create_shovel_setup(instance_name, local_host, port, vhost, vhome, is_ssl): "dest-exchange": "volttron"} ) rmq_mgmt.set_parameter("shovel", - name, - prop) + name, + prop) + import gevent + gevent.sleep(2) + print(f"Setup for shovel with name: {name} is completed. " + f"Status is: {rmq_mgmt.get_shovel_link_status(name)}") rpc_config = shovel.get("rpc", {}) - _log.debug("RPC config: {}".format(rpc_config)) for remote_instance, agent_ids in rpc_config.items(): for ids in agent_ids: local_identity = ids[0] remote_identity = ids[1] rmq_user = instance_name + '.' + local_identity - src_uri = rmq_mgmt.build_shovel_connection(rmq_user, - local_host, port, - vhost, is_ssl) - - certs_dict = None - if 'certificates' in shovel: - _log.debug("shovel parameters under destination: {}".format(shovel)) - is_csr = shovel['certificates'].get('csr', False) - if is_csr: - certs_dict = dict() - certs_dict['ca_file'] = shovel['certificates']['remote_ca'] - certs_dict['cert_file'] = shovel['certificates']['public_cert'] - certs_dict['key_file'] = shovel['certificates']['private_key'] - rmq_user = shovel['shovel-user'] - _log.debug(f"certs parameters: {certs_dict}") + src_uri = rmq_mgmt.build_remote_plugin_connection(rmq_user, + local_host, port, + vhost, is_ssl) + + if 'certificates' not in shovel: + _log.error("Certificates not found.\nContinuing with other configurations") + continue + + rmq_user = shovel['shovel-user'] + try: + # Build destination address + dest_uri = rmq_mgmt.build_remote_plugin_connection(rmq_user, + remote_host, + shovel['port'], + shovel['virtual-host'], + is_ssl, + certs_dict=shovels[remote_host][ + 'certificates']) + except Exception as ex: + _log.error("Exception occured while trying to establish rabbitmq connection. " + "Check if rabbitmq is running.") + return - # Build destination address - dest_uri = rmq_mgmt.build_shovel_connection(rmq_user, - remote_host, shovel['port'], - shovel['virtual-host'], - is_ssl, certs_dict=certs_dict) _log.info("Creating shovel to make RPC call to remote Agent" ": {}".format(remote_identity)) name = "shovel-{host}-{identity}".format(host=remote_host, identity=local_identity) + if name in shovel_names: + _log.error(f"Shovel with name: {name} already exists. " + "Skipping this configuration and moving to next one") + continue routing_key = "{instance}.{identity}.#".format( instance=remote_instance, identity=remote_identity) @@ -353,11 +478,16 @@ def _create_shovel_setup(instance_name, local_host, port, vhost, vhome, is_ssl): ) rmq_mgmt.set_parameter("shovel", - name, - prop) + name, + prop) + import gevent + gevent.sleep(2) + print(f"Setup for shovel with name: {name} is completed. " + f"Status is: {rmq_mgmt.get_shovel_link_status(name)}") + + except KeyError as exc: - _log.error("Shovel setup did not complete. Missing Key: {}".format( - exc)) + _log.error("Shovel setup did not complete. Missing Key: {}".format(exc)) def _setup_for_ssl_auth(rmq_config, rmq_conf_file, env=None): @@ -515,7 +645,7 @@ def _create_certs(rmq_config, admin_client_name, server_cert_name): 'organization-unit', 'common-name']) or all( - k in cert_data for k in ['ca-public-key', 'ca-private-key'])): + k in cert_data for k in ['ca-public-key', 'ca-private-key'])): _log.error( "\nNo certificate data found in {} or certificate data is " "incomplete. certificate-data should either contain all " @@ -573,7 +703,7 @@ def _verify_and_save_instance_ca(rmq_config, instance_ca_path, instance_ca_key): def setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_name=None, - rmq_conf_file=None, env=None): + rmq_conf_file=None, max_retries=12, env=None): """ Setup VOLTTRON instance to run with RabbitMQ message bus. :param setup_type: @@ -595,15 +725,17 @@ def setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_na store_message_bus_config(message_bus='rmq', instance_name=instance_name) rmq_config = RMQConfig() + success = True if prompt: - # ignore any existing rabbitmq_config.yml in vhome. Prompt user and - # generate a new rabbitmq_config.yml + # ignore any existing rabbitmq_config.yml|rabbitmq_federation_config.yml|rabbitmq_shovel_config.yml in vhome. + # Prompt user and generate a new rabbitmq_config.yml try: - success = _create_rabbitmq_config(rmq_config, setup_type, verbose) + success = _create_rabbitmq_config(rmq_config, setup_type, verbose, max_retries) except Exception as exc: _log.error(f"{exc}") return exc + if not success: # something went wrong when creating rmq config # do not create anything. return @@ -611,6 +743,7 @@ def setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_na # Load either the newly created config or config passed try: rmq_config.load_rmq_config() + _log.debug(f"RMQConfig : {setup_type}, {rmq_config.volttron_home}, os env: {os.environ['VOLTTRON_HOME']}") except (yaml.parser.ParserError, yaml.scanner.ScannerError, yaml.YAMLError) as exc: _log.error("Error: YAML file cannot parsed properly. Check the contents of the file") @@ -687,11 +820,12 @@ def setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_na if setup_type in ["all", "federation"]: # Create a multi-platform federation setup invalid = False - _create_federation_setup(rmq_config.admin_user, - rmq_config.admin_pwd, - rmq_config.is_ssl, - rmq_config.virtual_host, - rmq_config.volttron_home) + s = _get_federation_certs(rmq_config.volttron_home) + + if s: + _create_federation_setup(rmq_config.is_ssl, + rmq_config.virtual_host, + rmq_config.volttron_home) if setup_type in ["all", "shovel"]: # Create shovel setup invalid = False @@ -699,17 +833,22 @@ def setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_na port = rmq_config.amqp_port_ssl else: port = rmq_config.amqp_port - _create_shovel_setup(rmq_config.instance_name, - rmq_config.hostname, - port, - rmq_config.virtual_host, - rmq_config.volttron_home, - rmq_config.is_ssl) + + # Check if certs are available in shovel config. If missing, request CSR + s = _get_certs_for_shovel(rmq_config.volttron_home) + + if s: + _create_shovel_setup(rmq_config.instance_name, + rmq_config.hostname, + port, + rmq_config.virtual_host, + rmq_config.volttron_home, + rmq_config.is_ssl) if invalid: _log.error("Unknown option. Exiting....") -def _create_rabbitmq_config(rmq_config, setup_type, verbose=False): +def _create_rabbitmq_config(rmq_config, setup_type, verbose=False, max_retries=12): """ Prompt user for required details and create a rabbitmq_config.yml file in volttron home @@ -776,7 +915,7 @@ def _create_rabbitmq_config(rmq_config, setup_type, verbose=False): if is_file_readable(root_public): break while True: - prompt =\ + prompt = \ 'Enter the root CA certificate private key file:' root_key = prompt_response(prompt, mandatory=True) if is_file_readable(root_key): @@ -827,12 +966,12 @@ def _create_rabbitmq_config(rmq_config, setup_type, verbose=False): # if option was all then config_opts would be not null # if this was called with just setup_type = federation, load existing # config so that we don't overwrite existing federation configs - prompt_upstream_servers(rmq_config.volttron_home) + success = prompt_upstream_servers(rmq_config.volttron_home, verbose, max_retries) if setup_type in ['shovel', 'all']: # if option was all then config_opts would be not null # if this was called with just setup_type = shovel, load existing # config so that we don't overwrite existing list - success = prompt_shovels(rmq_config.volttron_home, verbose) + success = prompt_shovels(rmq_config.volttron_home, verbose, max_retries) return success @@ -890,7 +1029,7 @@ def _prompt_ssl(): return False -def prompt_upstream_servers(vhome): +def prompt_upstream_servers(vhome, verbose=False, max_retries=12): """ Prompt for upstream server configurations and save in rabbitmq_federation_config.yml @@ -900,40 +1039,63 @@ def prompt_upstream_servers(vhome): 'rabbitmq_federation_config.yml') if os.path.exists(federation_config_file): - federation_config = read_config_file(federation_config_file) - else: - federation_config = {} + prompt = "rabbitmq_federation_config.yml exists in {} Do you wish to " \ + "use this file to configure federation".format(federation_config_file) + prompt = prompt_response(prompt, + valid_answers=y_or_n, + default='Y') + if prompt in y: + return True + else: + _log.info("New input data will be used to overwrite existing " + "{}".format(federation_config_file)) + federation_config = {} upstream_servers = federation_config.get('federation-upstream', {}) prompt = 'Number of upstream servers to configure:' count = prompt_response(prompt, default=1) count = int(count) i = 0 - for i in range(0, count): - prompt = 'Hostname of the upstream server: ' - host = prompt_response(prompt, mandatory=True) - prompt = 'Port of the upstream server: ' - port = prompt_response(prompt, default=5671) - prompt = 'Virtual host of the upstream server: ' - vhost = prompt_response(prompt, default='volttron') + try: + for i in range(0, count): + prompt = 'Hostname of the upstream server: ' + host = prompt_response(prompt, mandatory=True) + prompt = 'Port of the upstream server: ' + port = prompt_response(prompt, default=5671) + prompt = 'Virtual host of the upstream server: ' + vhost = prompt_response(prompt, default='volttron') - upstream_servers[host] = {'port': port, - 'virtual-host': vhost} + upstream_servers[host] = {'port': port, + 'virtual-host': vhost} - rmq_mgmt = RabbitMQMgmt() - instance_name = get_platform_instance_name() - upstream_user = 'federation' - rmq_mgmt.build_agent_connection(upstream_user, instance_name) - import time - time.sleep(2) - upstream_servers[host]['federation-user'] = instance_name + "." + upstream_user - upstream_servers[host]['certificates'] = _prompt_csr_request(upstream_user, host, 'federation') - federation_config['federation-upstream'] = upstream_servers - write_to_config_file(federation_config_file, federation_config) - - -def prompt_shovels(vhome, verbose=False): + rmq_mgmt = RabbitMQMgmt() + instance_name = get_platform_instance_name() + upstream_user = 'federation' + + upstream_servers[host]['federation-user'] = instance_name + "." + upstream_user + certs_config, https_port = _prompt_csr_request(upstream_user, + host, + 'federation', + verbose, + max_retries) + if not certs_config: + # we did not get certificates - neither existing, nor through csr process + # exit + return False + upstream_servers[host]['certificates'] = certs_config + upstream_servers[host]['https-port'] = https_port + except (IOError, TimeoutError, ConnectionError) as e: + raise e + except ValueError as e: + _log.error("Invalid choice in the configuration: {}".format(e)) + else: + federation_config['federation-upstream'] = upstream_servers + write_to_config_file(federation_config_file, federation_config) + return True + + +def prompt_shovels(vhome, verbose=False, max_retries=12): """ Prompt for shovel configuration and save in rabbitmq_shovel_config.yml :return: @@ -941,10 +1103,18 @@ def prompt_shovels(vhome, verbose=False): shovel_config_file = os.path.join(vhome, 'rabbitmq_shovel_config.yml') if os.path.exists(shovel_config_file): - shovel_config = read_config_file(shovel_config_file) - else: - shovel_config = {} + prompt = "rabbitmq_shovel_config.yml exists in {} Do you wish to " \ + "use this file to configure shovels".format(shovel_config_file) + prompt = prompt_response(prompt, + valid_answers=y_or_n, + default='Y') + if prompt in y: + return True + else: + _log.info("New input data will be used to overwrite existing " + "{}".format(shovel_config_file)) + shovel_config = {} shovels = shovel_config.get('shovels', {}) prompt = 'Number of destination hosts to configure:' count = prompt_response(prompt, default=1) @@ -964,22 +1134,18 @@ def prompt_shovels(vhome, verbose=False): 'virtual-host': vhost} rmq_mgmt = RabbitMQMgmt() instance_name = get_platform_instance_name() - shovel_user = 'shovel{}'.format(host) + prompt = 'Name for the shovel user: ' + shovel_user = prompt_response(prompt, mandatory=True) - rmq_mgmt.build_agent_connection(shovel_user, instance_name) - import time - - time.sleep(2) shovels[host]['shovel-user'] = instance_name + "." + shovel_user - #_log.debug("shovel_user: {}".format(shovel_user)) - - certs_config = _prompt_csr_request(shovel_user, host, 'shovel', verbose) + certs_config, https_port = _prompt_csr_request(shovel_user, host, 'shovel', verbose, max_retries) if not certs_config: # we did not get certificates - neither existing, nor through csr process # exit return False shovels[host]['certificates'] = certs_config + shovels[host]['https-port'] = https_port prompt = prompt_response('\nDo you want shovels for ' 'PUBSUB communication? ', @@ -996,7 +1162,7 @@ def prompt_shovels(vhome, verbose=False): import re topics = re.sub(r"\s", "", topics) multi_topics = topics.split(",") - shovels[host]['pubsub'] = {agent_id : multi_topics} + shovels[host]['pubsub'] = {agent_id: multi_topics} prompt = prompt_response( '\nDo you want shovels for RPC communication? ', valid_answers=y_or_n, default='N') @@ -1024,17 +1190,17 @@ def prompt_shovels(vhome, verbose=False): return True -def _prompt_csr_request(rmq_user, host, type, verbose=False): +def _prompt_csr_request(rmq_user, host, type, verbose=False, max_retries=12): prompt = prompt_response('\nDo you have certificates signed by remote CA? ', valid_answers=y_or_n, default='N') - csr_config = dict() + https_port = None if prompt in y: prompt = 'Full path to remote CA certificate: ' ca_file = prompt_response(prompt, default='') - csr_config['csr'] = True + if not os.path.exists(ca_file): raise IOError(f"Path does not exist: {ca_file}. Please check the path and try again") # ca cert @@ -1059,29 +1225,27 @@ def _prompt_csr_request(rmq_user, host, type, verbose=False): remote_addr = prompt_response(prompt, default=remote_https_address) parsed_address = urlparse(remote_addr) + https_port = parsed_address.port + if parsed_address.scheme not in ('https',): raise IOError(f"Remote web interface is not valid: {parsed_address}. Please check and try again") # request CSR from remote host - ca_file, cert_file, prvt_file = _request_csr(rmq_user, remote_addr, type,verbose) + ca_file, cert_file, prvt_file = _request_csr(rmq_user, remote_addr, type, verbose, max_retries) if ca_file is not None and cert_file is not None and prvt_file is not None: - csr_config['csr'] = True - # _log.debug("CA file path: {}".format(ca_file)) + # ca cert csr_config['remote_ca'] = ca_file # public cert csr_config['public_cert'] = cert_file - # _log.debug("Public cert path: {}".format(certfile)) # private_key - crts = certs.Certs() - # _log.debug("Private cert path: {}".format(prvtfile)) csr_config['private_key'] = prvt_file - return csr_config + return csr_config, https_port -def _request_csr(rmq_user, remote_addr, type, verbose=False): +def _request_csr(rmq_user, remote_addr, type, verbose=False, cert_exists=True, max_retries=12): ca_file = None certfile = None prvtfile = None @@ -1090,11 +1254,14 @@ def _request_csr(rmq_user, remote_addr, type, verbose=False): # so that we don't get info level logs showing up during our multiple csr requests logging.getLogger("volttron.platform.web.discovery").setLevel(logging.WARNING) + rmqmgmt = RabbitMQMgmt() + fqid_local = get_fq_identity(rmq_user) + rmqmgmt.create_signed_certs(fqid_local) + response = request_cert_for_plugin(rmq_user, remote_addr, type) success = False retry_attempt = 0 - max_retries = 12 denied = False if response is None: # Error /status is pending @@ -1105,7 +1272,6 @@ def _request_csr(rmq_user, remote_addr, type, verbose=False): # Try for two minutes. # TODO make max attempts and/or sleep interval optional arg while not success and retry_attempt < max_retries: - if response is None: break elif response[0] == 'PENDING': @@ -1207,7 +1373,9 @@ def request_plugin_cert(csr_server, fully_qualified_local_identity, discovery_in raise ValueError("Only can create csr for rabbitmq based platform in ssl mode.") crts = certs.Certs() + csr_request = crts.create_csr(fully_qualified_local_identity, discovery_info.instance_name) + # The csr request requires the fully qualified identity that is # going to be connected to the external instance. # @@ -1215,6 +1383,7 @@ def request_plugin_cert(csr_server, fully_qualified_local_identity, discovery_in # concatenated with the identity of the local fully qualified identity. remote_cert_name = "{}.{}".format(discovery_info.instance_name, fully_qualified_local_identity) + _log.debug(f"Remote cert name: {remote_cert_name}, fqid: {fully_qualified_local_identity}") remote_ca_name = discovery_info.instance_name + "_ca" json_request = dict( @@ -1227,6 +1396,7 @@ def request_plugin_cert(csr_server, fully_qualified_local_identity, discovery_in verify=False) response = grequests.map([request]) + _log.debug(f"request_plugin_cert: grequest response: {response}") if response and isinstance(response, list): response[0].raise_for_status() response = response[0] @@ -1239,10 +1409,10 @@ def request_plugin_cert(csr_server, fully_qualified_local_identity, discovery_in remote_certs_dir = get_remote_certs_dir(type) if status == 'SUCCESSFUL' or status == 'APPROVED': crts.save_agent_remote_info(remote_certs_dir, - fully_qualified_local_identity, - remote_cert_name, cert.encode("utf-8"), - remote_ca_name, - discovery_info.rmq_ca_cert.encode("utf-8")) + fully_qualified_local_identity, + remote_cert_name, cert.encode("utf-8"), + remote_ca_name, + discovery_info.rmq_ca_cert.encode("utf-8")) os.environ['REQUESTS_CA_BUNDLE'] = os.path.join(remote_certs_dir, "requests_ca_bundle") elif status == 'PENDING': pass @@ -1266,9 +1436,8 @@ def request_plugin_cert(csr_server, fully_qualified_local_identity, discovery_in def request_cert_for_plugin(rmq_user, https_address, type): value = None parsed_address = urlparse(https_address) + if parsed_address.scheme in ('https',): - from volttron.platform.web import DiscoveryInfo - from volttron.platform.agent.utils import get_platform_instance_name, get_fq_identity info = DiscoveryInfo.request_discovery_info(https_address) # This is if both remote and local are rmq message buses. @@ -1278,8 +1447,10 @@ def request_cert_for_plugin(rmq_user, https_address, type): # Check if we already have the cert, if so use it instead of requesting cert again remote_certs_dir = get_remote_certs_dir(type) remote_cert_name = "{}.{}".format(info.instance_name, fqid_local) + _log.debug(f"Remote cert name: {remote_cert_name}") certfile = os.path.join(remote_certs_dir, remote_cert_name + ".crt") + _log.debug(f"request_cert_for_plugin:{certfile}") if os.path.exists(certfile): value = certfile else: @@ -1354,7 +1525,7 @@ def start_rabbit(rmq_home, env=None): _log.debug("Rabbitmq is not running. Attempting to start") msg = "Error starting rabbitmq at {}".format(rmq_home) # attempt to start once - execute_command(start_cmd, env=env, err_prefix=msg, logger=_log) + execute_command(start_cmd, env=env, err_prefix=msg, logger=_log) start = False else: if i > 60: # if more than 60 tries we assume something failed @@ -1366,8 +1537,7 @@ def start_rabbit(rmq_home, env=None): if __name__ == "__main__": - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('setup_type', help='Instance type: all, single, federation or shovel') parser.add_argument('prompt', default=False, @@ -1377,4 +1547,3 @@ def start_rabbit(rmq_home, env=None): setup_rabbitmq_volttron(args.setup_type, args.prompt) except KeyboardInterrupt: _log.info("Exiting setup process") - diff --git a/volttrontesting/fixtures/rmq_test_setup.py b/volttrontesting/fixtures/rmq_test_setup.py index 4d7143fba8..0b9b365a5e 100644 --- a/volttrontesting/fixtures/rmq_test_setup.py +++ b/volttrontesting/fixtures/rmq_test_setup.py @@ -191,6 +191,7 @@ def create_rmq_volttron_setup(vhome=None, ssl_auth=False, env=None, prompt=False, instance_name=rabbit_config_obj.instance_name, rmq_conf_file=rabbit_config_obj.rmq_conf_file, + max_retries=5, env=env) return rabbit_config_obj diff --git a/volttrontesting/fixtures/volttron_platform_fixtures.py b/volttrontesting/fixtures/volttron_platform_fixtures.py index d9450f5459..383313863b 100644 --- a/volttrontesting/fixtures/volttron_platform_fixtures.py +++ b/volttrontesting/fixtures/volttron_platform_fixtures.py @@ -3,23 +3,29 @@ from pathlib import Path import shutil from typing import Optional +from urllib.parse import urlparse import psutil import pytest -from volttron.platform import is_rabbitmq_available +from volttron.platform import is_rabbitmq_available, is_web_available from volttron.platform import update_platform_config from volttron.utils import get_random_key from volttrontesting.fixtures.cert_fixtures import certs_profile_1 -from volttrontesting.utils.platformwrapper import PlatformWrapper +from volttrontesting.utils.platformwrapper import PlatformWrapper, with_os_environ from volttrontesting.utils.platformwrapper import create_volttron_home from volttrontesting.utils.utils import get_hostname_and_random_port, get_rand_vip, get_rand_ip_and_port +from volttron.utils.rmq_mgmt import RabbitMQMgmt +from volttron.utils.rmq_setup import start_rabbit PRINT_LOG_ON_SHUTDOWN = False HAS_RMQ = is_rabbitmq_available() +HAS_WEB = is_web_available() + ci_skipif = pytest.mark.skipif(os.getenv('CI', None) == 'true', reason='SSL does not work in CI') rmq_skipif = pytest.mark.skipif(not HAS_RMQ, reason='RabbitMQ is not setup and/or SSL does not work in CI') +web_skipif = pytest.mark.skipif(not HAS_WEB, reason='Web libraries are not installed') def print_log(volttron_home): @@ -270,7 +276,7 @@ def volttron_instance_web(request): @pytest.fixture(scope="module", params=[ - dict(sink='zmq_web', source='zmq', zmq_ssl=False), + pytest.param(dict(sink='zmq_web', source='zmq', zmq_ssl=False), marks=web_skipif), pytest.param(dict(sink='zmq_web', source='zmq', zmq_ssl=True), marks=ci_skipif), pytest.param(dict(sink='rmq_web', source='zmq', zmq_ssl=False), marks=rmq_skipif), pytest.param(dict(sink='rmq_web', source='rmq', zmq_ssl=False), marks=rmq_skipif), @@ -489,3 +495,202 @@ def get_test_volttron_home(messagebus: str, web_https=False, web_http=False, has os.environ.update(env_cpy) if not os.environ.get("DEBUG", 0) != 1 and not os.environ.get("DEBUG_MODE", 0): shutil.rmtree(volttron_home, ignore_errors=True) + + +@pytest.fixture(scope="module") +def federated_rmq_instances(request, **kwargs): + """ + Create two rmq based volttron instances. One to act as producer of data and one to act as consumer of data + producer is upstream instance and consumer is the downstream instance + + :return: 2 volttron instances - (producer, consumer) that are federated + """ + upstream_vip = get_rand_vip() + upstream_hostname, upstream_https_port = get_hostname_and_random_port() + web_address = 'https://{hostname}:{port}'.format(hostname=upstream_hostname, port=upstream_https_port) + upstream = build_wrapper(upstream_vip, + ssl_auth=True, + messagebus='rmq', + should_start=True, + bind_web_address=web_address, + instance_name='volttron1', + **kwargs) + upstream.enable_auto_csr() + downstream_vip = get_rand_vip() + hostname, https_port = get_hostname_and_random_port() + downstream_web_address = 'https://{hostname}:{port}'.format(hostname=hostname, port=https_port) + + downstream = build_wrapper(downstream_vip, + ssl_auth=True, + messagebus='rmq', + should_start=False, + bind_web_address=downstream_web_address, + instance_name='volttron2', + **kwargs) + + link_name = None + rmq_mgmt = None + try: + # create federation config and save in volttron home of 'downstream' instance + content = dict() + fed = dict() + fed[upstream.rabbitmq_config_obj.rabbitmq_config["host"]] = { + 'port': upstream.rabbitmq_config_obj.rabbitmq_config["amqp-port-ssl"], + 'virtual-host': upstream.rabbitmq_config_obj.rabbitmq_config["virtual-host"], + 'https-port': upstream_https_port, + 'federation-user': "{}.federation".format(downstream.instance_name)} + content['federation-upstream'] = fed + import yaml + config_path = os.path.join(downstream.volttron_home, "rabbitmq_federation_config.yml") + with open(config_path, 'w') as yaml_file: + yaml.dump(content, yaml_file, default_flow_style=False) + + # setup federation link from 'downstream' to 'upstream' instance + downstream.setup_federation(config_path) + + downstream.startup_platform(vip_address=downstream_vip, + bind_web_address=downstream_web_address) + with with_os_environ(downstream.env): + rmq_mgmt = RabbitMQMgmt() + links = rmq_mgmt.get_federation_links() + assert links and links[0]['status'] == 'running' + link_name = links[0]['name'] + + except Exception as e: + print("Exception setting up federation: {}".format(e)) + upstream.shutdown_platform() + if downstream.is_running(): + downstream.shutdown_platform() + raise e + + yield upstream, downstream + + if link_name and rmq_mgmt: + rmq_mgmt.delete_multiplatform_parameter('federation-upstream', link_name) + upstream.shutdown_platform() + downstream.shutdown_platform() + + +@pytest.fixture(scope="module") +def two_way_federated_rmq_instances(request, **kwargs): + """ + Create two rmq based volttron instances. Create bi-directional data flow channel + by creating 2 federation links + + :return: 2 volttron instances - that are connected through federation + """ + instance_1_vip = get_rand_vip() + instance_1_hostname, instance_1_https_port = get_hostname_and_random_port() + instance_1_web_address = 'https://{hostname}:{port}'.format(hostname=instance_1_hostname, + port=instance_1_https_port) + + instance_1 = build_wrapper(instance_1_vip, + ssl_auth=True, + messagebus='rmq', + should_start=True, + bind_web_address=instance_1_web_address, + instance_name='volttron1', + **kwargs) + + instance_1.enable_auto_csr() + + instance_2_vip = get_rand_vip() + instance_2_hostname, instance_2_https_port = get_hostname_and_random_port() + instance_2_webaddress = 'https://{hostname}:{port}'.format(hostname=instance_2_hostname, + port=instance_2_https_port) + + instance_2 = build_wrapper(instance_2_vip, + ssl_auth=True, + messagebus='rmq', + should_start=False, + bind_web_address=instance_2_webaddress, + instance_name='volttron2', + **kwargs) + + instance_2_link_name = None + instance_1_link_name = None + + try: + # create federation config and setup federation link to instance_1 + content = dict() + fed = dict() + fed[instance_1.rabbitmq_config_obj.rabbitmq_config["host"]] = { + 'port': instance_1.rabbitmq_config_obj.rabbitmq_config["amqp-port-ssl"], + 'virtual-host': instance_1.rabbitmq_config_obj.rabbitmq_config["virtual-host"], + 'https-port': instance_1_https_port, + 'federation-user': "{}.federation".format(instance_2.instance_name)} + content['federation-upstream'] = fed + import yaml + config_path = os.path.join(instance_2.volttron_home, "rabbitmq_federation_config.yml") + with open(config_path, 'w') as yaml_file: + yaml.dump(content, yaml_file, default_flow_style=False) + + print(f"instance 2 Fed config path:{config_path}, content: {content}") + + instance_2.setup_federation(config_path) + instance_2.startup_platform(vip_address=instance_2_vip, bind_web_address=instance_2_webaddress) + instance_2.enable_auto_csr() + # Check federation link status + with with_os_environ(instance_2.env): + rmq_mgmt = RabbitMQMgmt() + links = rmq_mgmt.get_federation_links() + print(f"instance 2 fed links state: {links[0]['status']}") + assert links and links[0]['status'] == 'running' + instance_2_link_name = links[0]['name'] + + instance_1.skip_cleanup = True + instance_1.shutdown_platform() + instance_1.skip_cleanup = False + + start_rabbit(rmq_home=instance_1.rabbitmq_config_obj.rmq_home, env=instance_1.env) + + # create federation config and setup federation to instance_2 + content = dict() + fed = dict() + fed[instance_2.rabbitmq_config_obj.rabbitmq_config["host"]] = { + 'port': instance_2.rabbitmq_config_obj.rabbitmq_config["amqp-port-ssl"], + 'virtual-host': instance_2.rabbitmq_config_obj.rabbitmq_config["virtual-host"], + 'https-port': instance_2_https_port, + 'federation-user': "{}.federation".format(instance_1.instance_name)} + content['federation-upstream'] = fed + import yaml + config_path = os.path.join(instance_1.volttron_home, "rabbitmq_federation_config.yml") + with open(config_path, 'w') as yaml_file: + yaml.dump(content, yaml_file, default_flow_style=False) + + print(f"instance 1 Fed config path:{config_path}, content: {content}") + + instance_1.setup_federation(config_path) + instance_1.startup_platform(vip_address=instance_1_vip, bind_web_address=instance_1_web_address) + import gevent + gevent.sleep(10) + # Check federation link status + with with_os_environ(instance_1.env): + rmq_mgmt = RabbitMQMgmt() + links = rmq_mgmt.get_federation_links() + print(f"instance 1 fed links state: {links[0]['status']}") + assert links and links[0]['status'] == 'running' + instance_1_link_name = links[0]['name'] + + except Exception as e: + print(f"Exception setting up federation: {e}") + instance_1.shutdown_platform() + instance_2.shutdown_platform() + raise e + + yield instance_1, instance_2 + + if instance_1_link_name: + with with_os_environ(instance_1.env): + rmq_mgmt = RabbitMQMgmt() + rmq_mgmt.delete_multiplatform_parameter('federation-upstream', + instance_1_link_name) + if instance_2_link_name: + with with_os_environ(instance_2.env): + rmq_mgmt = RabbitMQMgmt() + rmq_mgmt.delete_multiplatform_parameter('federation-upstream', + instance_2_link_name) + instance_1.shutdown_platform() + instance_2.shutdown_platform() + + diff --git a/volttrontesting/multiplatform/test_federation.py b/volttrontesting/multiplatform/test_federation.py new file mode 100644 index 0000000000..6dd0a651a7 --- /dev/null +++ b/volttrontesting/multiplatform/test_federation.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- {{{ +# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et: +# +# Copyright 2020, Battelle Memorial Institute. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This material was prepared as an account of work sponsored by an agency of +# the United States Government. Neither the United States Government nor the +# United States Department of Energy, nor Battelle, nor any of their +# employees, nor any jurisdiction or organization that has cooperated in the +# development of these materials, makes any warranty, express or +# implied, or assumes any legal liability or responsibility for the accuracy, +# completeness, or usefulness or any information, apparatus, product, +# software, or process disclosed, or represents that its use would not infringe +# privately owned rights. Reference herein to any specific commercial product, +# process, or service by trade name, trademark, manufacturer, or otherwise +# does not necessarily constitute or imply its endorsement, recommendation, or +# favoring by the United States Government or any agency thereof, or +# Battelle Memorial Institute. The views and opinions of authors expressed +# herein do not necessarily state or reflect those of the +# United States Government or any agency thereof. +# +# PACIFIC NORTHWEST NATIONAL LABORATORY operated by +# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY +# under Contract DE-AC05-76RL01830 +# }}} +""" +pytest test cases base historian to test all_platform configuration. +By default all_platform is set to False and historian subscribes only to topics from local message bus. +When all_platforms=True, historian will subscribe to topics from all connected platforms + +""" + +import gevent +import pytest + +from volttron.platform import get_examples +from volttron.platform.agent.known_identities import CONTROL + + +@pytest.mark.federation +def test_federation_pubsub(federated_rmq_instances): + upstream, downstream = federated_rmq_instances + assert upstream.is_running() + assert downstream.is_running() + + subscription_results2 = {} + subscription_results3 = {} + subscriber = downstream.dynamic_agent + publisher = upstream.dynamic_agent + + def callback2(peer, sender, bus, topic, headers, message): + subscription_results2[topic] = {'headers': headers, 'message': message} + print("platform2 sub results [{}] = {}".format(topic, subscription_results2[topic])) + + def callback3(peer, sender, bus, topic, headers, message): + subscription_results3[topic] = {'headers': headers, 'message': message} + print("platform2 sub results [{}] = {}".format(topic, subscription_results3[topic])) + + subscriber.vip.pubsub.subscribe(peer='pubsub', + prefix='devices/campus/building1', + callback=callback2, + all_platforms=True) + + subscriber.vip.pubsub.subscribe(peer='pubsub', + prefix='analysis', + callback=callback3, + all_platforms=True) + + gevent.sleep(1) + for i in range(5): + publisher.vip.pubsub.publish(peer='pubsub', topic='devices/campus/building1', message=[{'point': 'value'}]) + gevent.sleep(1) + message = subscription_results2['devices/campus/building1']['message'] + assert message == [{'point': 'value'}] + + for i in range(5): + publisher.vip.pubsub.publish(peer='pubsub', + topic='analysis/airside/campus/building1', + message=[{'result': 'pass'}]) + gevent.sleep(1) + message = subscription_results3['analysis/airside/campus/building1']['message'] + assert message == [{'result': 'pass'}] + + +@pytest.mark.federation +def test_federation_rpc(two_way_federated_rmq_instances): + instance_1, instance_2 = two_way_federated_rmq_instances + assert instance_1.is_running() + assert instance_2.is_running() + + auuid = None + try: + auuid = instance_2.install_agent( + agent_dir=get_examples("ListenerAgent"), start=True) + assert auuid is not None + test_agent = instance_1.dynamic_agent + kwargs = {"external_platform": instance_2.instance_name} + agts = test_agent.vip.rpc.call(CONTROL, + 'list_agents', + **kwargs).get(timeout=10) + + assert agts[0]['identity'].startswith('listener') + listener_uuid = agts[0]['uuid'] + test_agent.vip.rpc.call(CONTROL, + 'stop_agent', + listener_uuid, + **kwargs).get(timeout=10) + agt_status = test_agent.vip.rpc.call(CONTROL, + 'agent_status', + listener_uuid, + **kwargs).get(timeout=10) + assert agt_status[1] == 0 + finally: + if instance_2.is_running: + instance_2.remove_agent(auuid) + diff --git a/volttrontesting/multiplatform/test_shovel.py b/volttrontesting/multiplatform/test_shovel.py new file mode 100644 index 0000000000..31020ddf2a --- /dev/null +++ b/volttrontesting/multiplatform/test_shovel.py @@ -0,0 +1,319 @@ +# -*- coding: utf-8 -*- {{{ +# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et: +# +# Copyright 2020, Battelle Memorial Institute. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This material was prepared as an account of work sponsored by an agency of +# the United States Government. Neither the United States Government nor the +# United States Department of Energy, nor Battelle, nor any of their +# employees, nor any jurisdiction or organization that has cooperated in the +# development of these materials, makes any warranty, express or +# implied, or assumes any legal liability or responsibility for the accuracy, +# completeness, or usefulness or any information, apparatus, product, +# software, or process disclosed, or represents that its use would not infringe +# privately owned rights. Reference herein to any specific commercial product, +# process, or service by trade name, trademark, manufacturer, or otherwise +# does not necessarily constitute or imply its endorsement, recommendation, or +# favoring by the United States Government or any agency thereof, or +# Battelle Memorial Institute. The views and opinions of authors expressed +# herein do not necessarily state or reflect those of the +# United States Government or any agency thereof. +# +# PACIFIC NORTHWEST NATIONAL LABORATORY operated by +# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY +# under Contract DE-AC05-76RL01830 +# }}} +""" +pytest test cases base historian to test all_platform configuration. +By default all_platform is set to False and historian subscribes only to topics from local message bus. +When all_platforms=True, historian will subscribe to topics from all connected platforms + +""" + +import gevent +import pytest +from urllib.parse import urlparse +import os + +from volttron.platform import get_examples +from volttron.platform.agent.known_identities import CONTROL +from volttrontesting.fixtures.volttron_platform_fixtures import build_wrapper +from volttrontesting.utils.utils import get_hostname_and_random_port, get_rand_vip, get_rand_ip_and_port +from volttron.utils import rmq_mgmt +from volttrontesting.utils.platformwrapper import with_os_environ +from volttron.utils.rmq_mgmt import RabbitMQMgmt +from volttron.utils.rmq_setup import start_rabbit + + +@pytest.fixture(scope="module") +def shovel_pubsub_rmq_instances(request, **kwargs): + """ + Create two rmq based volttron instances. One to act as producer of data and one to act as consumer of data + Create a shovel to forward data from producer to consumer + + :return: 2 volttron instances - (producer, consumer) that have a shovel connection between them + """ + source_vip = get_rand_vip() + source_hostname, source_https_port = get_hostname_and_random_port() + source_web_address = 'https://{hostname}:{port}'.format(hostname=source_hostname, port=source_https_port) + source_instance_name = 'volttron1' + source = build_wrapper(source_vip, + ssl_auth=True, + messagebus='rmq', + should_start=False, + bind_web_address=source_web_address, + instance_name=source_instance_name, + **kwargs) + + sink_vip = get_rand_vip() + sink_hostname, sink_https_port = get_hostname_and_random_port() + sink_web_address = 'https://{hostname}:{port}'.format(hostname=sink_hostname, port=sink_https_port) + sink = build_wrapper(sink_vip, + ssl_auth=True, + messagebus='rmq', + should_start=True, + bind_web_address=sink_web_address, + instance_name='volttron2', + **kwargs) + + sink.enable_auto_csr() + link_name = None + try: + # create shovel config and save in volttron home of 'source' instance + pubsub_config = dict() + pubsub_config['dynamic_agent'] = 'test' + shovel_user = '{source_instance}.shovel{sink_host}'.format(source_instance=source_instance_name, + sink_host=sink_hostname) + config_path = create_shovel_config(source.volttron_home, + sink.rabbitmq_config_obj.rabbitmq_config["host"], + sink.rabbitmq_config_obj.rabbitmq_config["amqp-port-ssl"], + sink_https_port, + sink.rabbitmq_config_obj.rabbitmq_config["virtual-host"], + shovel_user, + pubsub_config=pubsub_config) + + # setup shovel from 'source' to 'sink' + source.setup_shovel(config_path) + source.startup_platform(vip_address=source_vip, bind_web_address=source_web_address) + with with_os_environ(source.env): + rmq_mgmt = RabbitMQMgmt() + links = rmq_mgmt.get_shovel_links() + assert links and links[0]['status'] == 'running' + link_name = links[0]['name'] + + except Exception as e: + print("Exception setting up shovel: {}".format(e)) + source.shutdown_platform() + sink.shutdown_platform() + raise e + + yield source, sink + if link_name: + rmq_mgmt.delete_multiplatform_parameter('shovel', link_name) + source.shutdown_platform() + sink.shutdown_platform() + + +def create_shovel_config(vhome, host, port, https_port, vhost, shover_user, pubsub_config=None, rpc_config=None): + content = dict() + shovel = dict() + shovel[host] = {'https-port': https_port, + 'port': port, + 'shovel-user': shover_user, + 'virtual-host': vhost, + } + if pubsub_config: + shovel[host]['pubsub'] = pubsub_config + if rpc_config: + shovel[host]['rpc'] = rpc_config + content['shovel'] = shovel + + import yaml + config_path = os.path.join(vhome, "rabbitmq_shovel_config.yml") + print(f"config_path: {config_path}") + with open(config_path, 'w') as yaml_file: + yaml.dump(content, yaml_file, default_flow_style=False) + return config_path + + +@pytest.fixture(scope="module") +def two_way_shovel_connection(request, **kwargs): + """ + Create two rmq based volttron instances. Create bi-directional data flow channel + by adding 2 shovel connections + + :return: 2 volttron instances - connected through shovels + """ + source_vip = get_rand_vip() + source_hostname, source_https_port = get_hostname_and_random_port() + source_web_address = 'https://{hostname}:{port}'.format(hostname=source_hostname, port=source_https_port) + source_instance_name = 'volttron1' + source = build_wrapper(source_vip, + ssl_auth=True, + messagebus='rmq', + should_start=False, + bind_web_address=source_web_address, + instance_name=source_instance_name, + **kwargs) + + sink_vip = get_rand_vip() + sink_hostname, sink_https_port = get_hostname_and_random_port() + sink_web_address = 'https://{hostname}:{port}'.format(hostname=sink_hostname, port=sink_https_port) + sink_instance_name = 'volttron2' + sink = build_wrapper(sink_vip, + ssl_auth=True, + messagebus='rmq', + should_start=True, + bind_web_address=sink_web_address, + instance_name=sink_instance_name, + **kwargs) + + sink.enable_auto_csr() + source_link_name = None + try: + # create shovel config and save in volttron home of 'source' instance + source_shovel_user = '{source_instance}.shovel{sink_host}'.format(source_instance=source_instance_name, + sink_host=sink_hostname) + rpc_config = dict() + rpc_config[sink_instance_name] = [['dynamic_agent', CONTROL]] + config_path = create_shovel_config(source.volttron_home, + sink.rabbitmq_config_obj.rabbitmq_config["host"], + sink.rabbitmq_config_obj.rabbitmq_config["amqp-port-ssl"], + sink_https_port, + sink.rabbitmq_config_obj.rabbitmq_config["virtual-host"], + source_shovel_user, + rpc_config=rpc_config) + + # setup shovel from 'source' to 'sink' + source.setup_shovel(config_path) + source.startup_platform(vip_address=source_vip, bind_web_address=source_web_address) + source.enable_auto_csr() + + # Check shovel link status + with with_os_environ(source.env): + rmq_mgmt = RabbitMQMgmt() + links = rmq_mgmt.get_shovel_links() + assert links and links[0]['status'] == 'running' + source_link_name = links[0]['name'] + + sink.skip_cleanup = True + sink.shutdown_platform() + sink.skip_cleanup = False + + # Start RabbitMQ broker to establish shovel link + start_rabbit(rmq_home=sink.rabbitmq_config_obj.rmq_home, env=sink.env) + + rpc_config = dict() + rpc_config[source_instance_name] = [[CONTROL, 'dynamic_agent']] + sink_shovel_user = '{source_instance}.shovel{sink_host}'.format(source_instance=sink_instance_name, + sink_host=source_hostname) + + config_path = create_shovel_config(sink.volttron_home, + source.rabbitmq_config_obj.rabbitmq_config["host"], + source.rabbitmq_config_obj.rabbitmq_config["amqp-port-ssl"], + source_https_port, + source.rabbitmq_config_obj.rabbitmq_config["virtual-host"], + sink_shovel_user, + rpc_config=rpc_config) + + sink.setup_shovel(config_path) + sink.startup_platform(vip_address=sink_vip, bind_web_address=sink_web_address) + + # Check shovel link status + with with_os_environ(sink.env): + rmq_mgmt = RabbitMQMgmt() + links = rmq_mgmt.get_shovel_links() + assert links and links[0]['status'] == 'running' + sink_link_name = links[0]['name'] + + except Exception as e: + print("Exception setting up shovel: {}".format(e)) + source.shutdown_platform() + sink.shutdown_platform() + raise e + + yield source, sink + if source_link_name: + with with_os_environ(source.env): + rmq_mgmt = RabbitMQMgmt() + rmq_mgmt.delete_multiplatform_parameter('shovel', source_link_name) + if sink_link_name: + with with_os_environ(sink.env): + rmq_mgmt = RabbitMQMgmt() + rmq_mgmt.delete_multiplatform_parameter('shovel', sink_link_name) + source.shutdown_platform() + sink.shutdown_platform() + + +@pytest.mark.shovel +def test_shovel_pubsub(shovel_pubsub_rmq_instances): + source, sink = shovel_pubsub_rmq_instances + assert source.is_running() + assert sink.is_running() + + subscription_results2 = {} + publisher = source.dynamic_agent + subscriber = sink.dynamic_agent + + def callback2(peer, sender, bus, topic, headers, message): + subscription_results2[topic] = {'headers': headers, 'message': message} + print("platform2 sub results [{}] = {}".format(topic, subscription_results2[topic])) + + subscriber.vip.pubsub.subscribe(peer='pubsub', + prefix='test/campus/building1', + callback=callback2, + all_platforms=True) + + gevent.sleep(1) + for i in range(5): + publisher.vip.pubsub.publish(peer='pubsub', topic='test/campus/building1', message=[{'point': 'value'}]) + gevent.sleep(1) + message = subscription_results2['test/campus/building1']['message'] + assert message == [{'point': 'value'}] + + +@pytest.mark.shovel +def test_shovel_rpc(two_way_shovel_connection): + instance_1, instance_2 = two_way_shovel_connection + assert instance_1.is_running() + assert instance_2.is_running() + + auuid = None + try: + auuid = instance_2.install_agent(vip_identity='listener', + agent_dir=get_examples("ListenerAgent"), + start=True) + + assert auuid is not None + test_agent = instance_1.dynamic_agent + kwargs = {"external_platform": instance_2.instance_name} + agts = test_agent.vip.rpc.call(CONTROL, + 'list_agents', + **kwargs).get(timeout=10) + + assert agts[0]['identity'].startswith('listener') + listener_uuid = agts[0]['uuid'] + test_agent.vip.rpc.call(CONTROL, + 'stop_agent', + listener_uuid, + **kwargs).get(timeout=10) + agt_status = test_agent.vip.rpc.call(CONTROL, + 'agent_status', + listener_uuid, + **kwargs).get(timeout=10) + assert agt_status[1] == 0 + finally: + if instance_2 and auuid: + instance_2.remove_agent(auuid) diff --git a/volttrontesting/utils/platformwrapper.py b/volttrontesting/utils/platformwrapper.py index db84c2fe81..9b212620d4 100644 --- a/volttrontesting/utils/platformwrapper.py +++ b/volttrontesting/utils/platformwrapper.py @@ -44,6 +44,7 @@ from volttrontesting.utils.utils import get_rand_tcp_address from volttrontesting.fixtures.rmq_test_setup import create_rmq_volttron_setup from volttron.utils.rmq_setup import start_rabbit, stop_rabbit +from volttron.utils.rmq_setup import setup_rabbitmq_volttron utils.setup_logging() _log = logging.getLogger(__name__) @@ -1336,13 +1337,31 @@ def setup_federation(self, config_path): :param config_path: path to federation config yml file. """ with with_os_environ(self.env): - _log.debug("Setting up federation using config : {}".format(config_path)) + print(f"VHOME WITH with_os_environ: {os.environ['VOLTTRON_HOME']}") + setup_rabbitmq_volttron('federation', + verbose=False, + prompt=False, + instance_name=self.instance_name, + rmq_conf_file=self.rabbitmq_config_obj.rmq_conf_file, + max_retries=5, + env=self.env) + + + def setup_shovel(self, config_path): + """ + Set up shovel using the given config path + :param config_path: path to shovel config yml file. + """ + with with_os_environ(self.env): + print(f"VHOME WITH with_os_environ: {os.environ['VOLTTRON_HOME']}") + setup_rabbitmq_volttron('shovel', + verbose=False, + prompt=False, + instance_name=self.instance_name, + rmq_conf_file=self.rabbitmq_config_obj.rmq_conf_file, + max_retries=5, + env=self.env) - cmd = ['vcfg'] - cmd.extend(['--vhome', self.volttron_home, '--instance-name', self.instance_name, '--rabbitmq', - "federation", config_path]) - execute_command(cmd, env=self.env, logger=_log, - err_prefix="Error setting up federation") def restart_platform(self): with with_os_environ(self.env):