Skip to content
This repository was archived by the owner on Feb 10, 2018. It is now read-only.

Commit

Permalink
Adding _rpc method for generic rpc calls and profile
Browse files Browse the repository at this point in the history
  • Loading branch information
dbarrosop committed Mar 26, 2017
1 parent f67ad16 commit 33d8536
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
20 changes: 20 additions & 0 deletions napalm_junos/junos.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

# import third party lib
from lxml.builder import E
from lxml import etree

from jnpr.junos import Device
from jnpr.junos.utils.config import Config
Expand Down Expand Up @@ -77,6 +78,8 @@ def __init__(self, hostname, username, password, timeout=60, optional_args=None)

self.device = Device(hostname, user=username, password=password, port=self.port)

self.profile = ["junos"]

def open(self):
"""Open the connection wit the device."""
try:
Expand Down Expand Up @@ -110,6 +113,23 @@ def _unlock(self):
self.device.cu.unlock()
self.locked = False

def _rpc(self, get, child=None, **kwargs):
"""
This allows you to construct an arbitrary RPC call to retreive common stuff. For example:
Configuration: get: "<get-configuration/>"
Interface information: get: "<get-interface-information/>"
A particular interfacece information:
get: "<get-interface-information/>"
child: "<interface-name>ge-0/0/0</interface-name>"
"""
rpc = etree.fromstring(get)

if child:
rpc.append(etree.fromstring(child))

response = self.device.execute(rpc)
return etree.tostring(response)

def is_alive(self):
# evaluate the state of the underlying SSH connection
# and also the NETCONF status from PyEZ
Expand Down
185 changes: 185 additions & 0 deletions tags
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
!_TAG_FILE_FORMAT 2 /extended format; --format=1 will not append ;" to lines/
!_TAG_FILE_SORTED 1 /0=unsorted, 1=sorted, 2=foldcase/
!_TAG_PROGRAM_AUTHOR Darren Hiebert /dhiebert@users.sourceforge.net/
!_TAG_PROGRAM_NAME Exuberant Ctags //
!_TAG_PROGRAM_URL http://ctags.sourceforge.net /official site/
!_TAG_PROGRAM_VERSION 5.8 //
.travis.yml .travis.yml 1;" F
BaseTestDouble test/unit/conftest.py /^from napalm_base.test.double import BaseTestDouble$/;" i
BaseTestGetters test/unit/test_getters.py /^from napalm_base.test.getters import BaseTestGetters$/;" i
C napalm_junos/junos.py /^import napalm_junos.constants as C$/;" i
CommandErrorException napalm_junos/junos.py /^from napalm_base.exceptions import CommandErrorException$/;" i
CommandTimeoutException napalm_junos/junos.py /^from napalm_base.exceptions import CommandTimeoutException$/;" i
Config napalm_junos/junos.py /^from jnpr.junos.utils.config import Config$/;" i
ConfigLoadError napalm_junos/junos.py /^from jnpr.junos.exception import ConfigLoadError$/;" i
ConnectTimeoutError napalm_junos/junos.py /^from jnpr.junos.exception import ConnectTimeoutError$/;" i
ConnectionException napalm_junos/junos.py /^from napalm_base.exceptions import ConnectionException$/;" i
Device napalm_junos/junos.py /^from jnpr.junos import Device$/;" i
E napalm_junos/junos.py /^from lxml.builder import E$/;" i
FactoryLoader napalm_junos/utils/junos_views.py /^from jnpr.junos.factory import loadyaml, FactoryLoader$/;" i
FakeConnection test/unit/TestJunOSDriver.py /^class FakeConnection:$/;" c
FakeConnection test/unit/conftest.py /^class FakeConnection:$/;" c
FakeConnectionRPCObject test/unit/TestJunOSDriver.py /^class FakeConnectionRPCObject:$/;" c
FakeConnectionRPCObject test/unit/conftest.py /^class FakeConnectionRPCObject:$/;" c
FakeJunOSDevice test/unit/TestJunOSDriver.py /^class FakeJunOSDevice:$/;" c
FakeJunOSDevice test/unit/conftest.py /^class FakeJunOSDevice(BaseTestDouble):$/;" c
FakeRPCObject test/unit/TestJunOSDriver.py /^class FakeRPCObject:$/;" c
FakeRPCObject test/unit/conftest.py /^class FakeRPCObject:$/;" c
JunOSDriver napalm_junos/__init__.py /^from napalm_junos.junos import JunOSDriver # noqa$/;" i
JunOSDriver napalm_junos/junos.py /^class JunOSDriver(NetworkDriver):$/;" c
JunOSDriver test/unit/TestJunOSDriver.py /^from napalm_junos.junos import JunOSDriver$/;" i
MergeConfigException napalm_junos/junos.py /^from napalm_base.exceptions import MergeConfigException$/;" i
NetworkDriver napalm_junos/junos.py /^from napalm_base.base import NetworkDriver$/;" i
OC_NETWORK_INSTANCE_TYPE_MAP napalm_junos/constants.py /^OC_NETWORK_INSTANCE_TYPE_MAP = {$/;" v
PatchedJunOSDriver test/unit/conftest.py /^class PatchedJunOSDriver(junos.JunOSDriver):$/;" c
RPCReply test/unit/TestJunOSDriver.py /^ class RPCReply:$/;" c function:FakeConnectionRPCObject.response
RPCReply test/unit/conftest.py /^ class RPCReply:$/;" c function:FakeConnectionRPCObject.response
ReplaceConfigException napalm_junos/junos.py /^from napalm_base.exceptions import ReplaceConfigException$/;" i
RpcError napalm_junos/junos.py /^from jnpr.junos.exception import RpcError$/;" i
RpcTimeoutError napalm_junos/junos.py /^from jnpr.junos.exception import RpcTimeoutError$/;" i
TestConfigJunOSDriver test/unit/TestJunOSDriver.py /^class TestConfigJunOSDriver(unittest.TestCase, TestConfigNetworkDriver):$/;" c
TestConfigNetworkDriver test/unit/TestJunOSDriver.py /^from napalm_base.test.base import TestConfigNetworkDriver, TestGettersNetworkDriver$/;" i
TestGetter test/unit/test_getters.py /^class TestGetter(BaseTestGetters):$/;" c
TestGetterJunOSDriver test/unit/TestJunOSDriver.py /^class TestGetterJunOSDriver(unittest.TestCase, TestGettersNetworkDriver):$/;" c
TestGettersNetworkDriver test/unit/TestJunOSDriver.py /^from napalm_base.test.base import TestConfigNetworkDriver, TestGettersNetworkDriver$/;" i
TestJunOSDriver.py test/unit/TestJunOSDriver.py 1;" F
_YAML_ napalm_junos/utils/junos_views.py /^_YAML_ = splitext(__file__)[0] + '.yml'$/;" v
__author__ setup.py /^__author__ = 'David Barroso <dbarrosop@dravetech.com>'$/;" v
__call__ test/unit/TestJunOSDriver.py /^ __call__ = response$/;" v class:FakeConnectionRPCObject
__call__ test/unit/TestJunOSDriver.py /^ __call__ = response$/;" v class:FakeRPCObject
__call__ test/unit/conftest.py /^ __call__ = response$/;" v class:FakeConnectionRPCObject
__call__ test/unit/conftest.py /^ __call__ = response$/;" v class:FakeRPCObject
__getattr__ test/unit/TestJunOSDriver.py /^ def __getattr__(self, item):$/;" m class:FakeRPCObject file:
__getattr__ test/unit/conftest.py /^ def __getattr__(self, item):$/;" m class:FakeRPCObject file:
__init__ napalm_junos/junos.py /^ def __init__(self, hostname, username, password, timeout=60, optional_args=None):$/;" m class:JunOSDriver
__init__ test/unit/TestJunOSDriver.py /^ def __init__(self, reply):$/;" m class:FakeConnectionRPCObject.response.RPCReply
__init__ test/unit/TestJunOSDriver.py /^ def __init__(self):$/;" m class:FakeJunOSDevice
__init__ test/unit/TestJunOSDriver.py /^ def __init__(self, device):$/;" m class:FakeRPCObject
__init__ test/unit/TestJunOSDriver.py /^ def __init__(self, rpc):$/;" m class:FakeConnection
__init__ test/unit/TestJunOSDriver.py /^ def __init__(self, rpc):$/;" m class:FakeConnectionRPCObject
__init__ test/unit/conftest.py /^ def __init__(self, reply):$/;" m class:FakeConnectionRPCObject.response.RPCReply
__init__ test/unit/conftest.py /^ def __init__(self):$/;" m class:FakeJunOSDevice
__init__ test/unit/conftest.py /^ def __init__(self, device):$/;" m class:FakeRPCObject
__init__ test/unit/conftest.py /^ def __init__(self, hostname, username, password, timeout=60, optional_args=None):$/;" m class:PatchedJunOSDriver
__init__ test/unit/conftest.py /^ def __init__(self, rpc):$/;" m class:FakeConnection
__init__ test/unit/conftest.py /^ def __init__(self, rpc):$/;" m class:FakeConnectionRPCObject
__init__.py napalm_junos/__init__.py 1;" F
__init__.py napalm_junos/utils/__init__.py 1;" F
__version__ napalm_junos/__init__.py /^ __version__ = "Not installed"$/;" v
__version__ napalm_junos/__init__.py /^ __version__ = pkg_resources.get_distribution('napalm-junos').version$/;" v
_get_address_family napalm_junos/junos.py /^ def _get_address_family(table):$/;" m class:JunOSDriver
_load_candidate napalm_junos/junos.py /^ def _load_candidate(self, filename, config, overwrite):$/;" m class:JunOSDriver
_loadyaml_bypass napalm_junos/utils/junos_views.py /^def _loadyaml_bypass(yaml_str):$/;" f
_lock napalm_junos/junos.py /^ def _lock(self):$/;" m class:JunOSDriver
_parse_route_stats napalm_junos/junos.py /^ def _parse_route_stats(self, neighbor):$/;" m class:JunOSDriver
_parse_value napalm_junos/junos.py /^ def _parse_value(value):$/;" m class:JunOSDriver
_preprocess_yml napalm_junos/utils/junos_views.py /^def _preprocess_yml(path):$/;" f
_rpc napalm_junos/junos.py /^ def _rpc(self, get, child=None, **kwargs):$/;" m class:JunOSDriver
_unlock napalm_junos/junos.py /^ def _unlock(self):$/;" m class:JunOSDriver
author setup.py /^ author="David Barroso, Mircea Ulinic",$/;" v
author_email setup.py /^ author_email="dbarrosop@dravetech.com, mircea@cloudflare.com",$/;" v
bind test/unit/conftest.py /^ def bind(*args, **kvargs):$/;" m class:FakeJunOSDevice
build_prefix_limit napalm_junos/junos.py /^ def build_prefix_limit(**args):$/;" f function:JunOSDriver.get_bgp_config
classifiers setup.py /^ classifiers=[$/;" v
cli napalm_junos/junos.py /^ def cli(self, commands):$/;" m class:JunOSDriver
cli test/unit/TestJunOSDriver.py /^ def cli(self, command=''):$/;" m class:FakeJunOSDevice
cli test/unit/conftest.py /^ def cli(self, command=''):$/;" m class:FakeJunOSDevice
close napalm_junos/junos.py /^ def close(self):$/;" m class:JunOSDriver
close test/unit/conftest.py /^ def close(self):$/;" m class:FakeJunOSDevice
collections napalm_junos/junos.py /^import collections$/;" i
commit_config napalm_junos/junos.py /^ def commit_config(self):$/;" m class:JunOSDriver
compare_config napalm_junos/junos.py /^ def compare_config(self):$/;" m class:JunOSDriver
conftest.py test/unit/conftest.py 1;" F
constants.py napalm_junos/constants.py 1;" F
deepcopy napalm_junos/junos.py /^from copy import deepcopy$/;" i
description setup.py /^ description="Network Automation and Programmability Abstraction Layer with Multivendor support",$/;" v
discard_config napalm_junos/junos.py /^ def discard_config(self):$/;" m class:JunOSDriver
etree napalm_junos/junos.py /^from lxml import etree$/;" i
fin test/unit/conftest.py /^ def fin():$/;" f function:set_device_parameters
find_packages setup.py /^from setuptools import setup, find_packages$/;" i
get_arp_table napalm_junos/junos.py /^ def get_arp_table(self):$/;" m class:JunOSDriver
get_bgp_config napalm_junos/junos.py /^ def get_bgp_config(self, group='', neighbor=''):$/;" m class:JunOSDriver
get_bgp_neighbors napalm_junos/junos.py /^ def get_bgp_neighbors(self):$/;" m class:JunOSDriver
get_bgp_neighbors_detail napalm_junos/junos.py /^ def get_bgp_neighbors_detail(self, neighbor_address=''):$/;" m class:JunOSDriver
get_config napalm_junos/junos.py /^ def get_config(self, retrieve='all'):$/;" m class:JunOSDriver
get_config test/unit/TestJunOSDriver.py /^ def get_config(self, get_cmd=None, filter_xml=None, options={}):$/;" m class:FakeRPCObject
get_config test/unit/conftest.py /^ def get_config(self, get_cmd=None, filter_xml=None, options={}):$/;" m class:FakeRPCObject
get_environment napalm_junos/junos.py /^ def get_environment(self):$/;" m class:JunOSDriver
get_facts napalm_junos/junos.py /^ def get_facts(self):$/;" m class:JunOSDriver
get_interfaces napalm_junos/junos.py /^ def get_interfaces(self):$/;" m class:JunOSDriver
get_interfaces_counters napalm_junos/junos.py /^ def get_interfaces_counters(self):$/;" m class:JunOSDriver
get_interfaces_ip napalm_junos/junos.py /^ def get_interfaces_ip(self):$/;" m class:JunOSDriver
get_lldp_neighbors napalm_junos/junos.py /^ def get_lldp_neighbors(self):$/;" m class:JunOSDriver
get_lldp_neighbors_detail napalm_junos/junos.py /^ def get_lldp_neighbors_detail(self, interface=''):$/;" m class:JunOSDriver
get_mac_address_table napalm_junos/junos.py /^ def get_mac_address_table(self):$/;" m class:JunOSDriver
get_network_instances napalm_junos/junos.py /^ def get_network_instances(self, name=''):$/;" m class:JunOSDriver
get_ntp_peers napalm_junos/junos.py /^ def get_ntp_peers(self):$/;" m class:JunOSDriver
get_ntp_servers napalm_junos/junos.py /^ def get_ntp_servers(self):$/;" m class:JunOSDriver
get_ntp_stats napalm_junos/junos.py /^ def get_ntp_stats(self):$/;" m class:JunOSDriver
get_optics napalm_junos/junos.py /^ def get_optics(self):$/;" m class:JunOSDriver
get_probes_config napalm_junos/junos.py /^ def get_probes_config(self):$/;" m class:JunOSDriver
get_probes_results napalm_junos/junos.py /^ def get_probes_results(self):$/;" m class:JunOSDriver
get_route_to napalm_junos/junos.py /^ def get_route_to(self, destination='', protocol=''):$/;" m class:JunOSDriver
get_snmp_information napalm_junos/junos.py /^ def get_snmp_information(self):$/;" m class:JunOSDriver
get_users napalm_junos/junos.py /^ def get_users(self):$/;" m class:JunOSDriver
helpers napalm_junos/junos.py /^import napalm_base.helpers$/;" i
include_package_data setup.py /^ include_package_data=True,$/;" v
install_reqs setup.py /^install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1())$/;" v
install_requires setup.py /^ install_requires=reqs,$/;" v
is_alive napalm_junos/junos.py /^ def is_alive(self):$/;" m class:JunOSDriver
is_alive test/unit/conftest.py /^ def is_alive(self):$/;" m class:PatchedJunOSDriver
junos test/unit/conftest.py /^from napalm_junos import junos$/;" i
junos.py napalm_junos/junos.py 1;" F
junos_views napalm_junos/junos.py /^from napalm_junos.utils import junos_views$/;" i
junos_views.py napalm_junos/utils/junos_views.py 1;" F
junos_views.yml napalm_junos/utils/junos_views.yml 1;" F
load_merge_candidate napalm_junos/junos.py /^ def load_merge_candidate(self, filename=None, config=None):$/;" m class:JunOSDriver
load_replace_candidate napalm_junos/junos.py /^ def load_replace_candidate(self, filename=None, config=None):$/;" m class:JunOSDriver
loadyaml napalm_junos/utils/junos_views.py /^from jnpr.junos.factory import loadyaml, FactoryLoader$/;" i
log napalm_junos/junos.py /^log = logging.getLogger(__file__)$/;" v
logging napalm_junos/junos.py /^import logging$/;" i
lxml test/unit/TestJunOSDriver.py /^import lxml$/;" i
lxml test/unit/conftest.py /^import lxml$/;" i
name setup.py /^ name="napalm-junos",$/;" v
napalm_base napalm_junos/junos.py /^import napalm_base.helpers$/;" i
napalm_junos napalm_junos/junos.py /^import napalm_junos.constants as C$/;" i
noqa napalm_junos/__init__.py /^from napalm_junos.junos import JunOSDriver # noqa$/;" i
noqa napalm_junos/constants.py /^from napalm_base.constants import * # noqa$/;" i
open napalm_junos/junos.py /^ def open(self):$/;" m class:JunOSDriver
open test/unit/conftest.py /^ def open(self):$/;" m class:FakeJunOSDevice
packages setup.py /^ packages=find_packages(),$/;" v
parent_conftest test/unit/conftest.py /^from napalm_base.test import conftest as parent_conftest$/;" i
parse_requirements setup.py /^from pip.req import parse_requirements$/;" i
ping napalm_junos/junos.py /^ def ping(self, destination, source=C.PING_SOURCE, ttl=C.PING_TTL,$/;" m class:JunOSDriver
pkg_resources napalm_junos/__init__.py /^import pkg_resources$/;" i
py23_compat napalm_junos/junos.py /^from napalm_base.utils import py23_compat$/;" i
py23_compat napalm_junos/utils/junos_views.py /^from napalm_base.utils import py23_compat$/;" i
pytest test/unit/conftest.py /^import pytest$/;" i
pytest test/unit/test_getters.py /^import pytest$/;" i
pytest_generate_tests test/unit/conftest.py /^def pytest_generate_tests(metafunc):$/;" f
re napalm_junos/junos.py /^import re$/;" i
re napalm_junos/utils/junos_views.py /^import re$/;" i
read_txt_file test/unit/TestJunOSDriver.py /^ def read_txt_file(self, filename):$/;" m class:FakeJunOSDevice
reqs setup.py /^reqs = [str(ir.req) for ir in install_reqs]$/;" v
response test/unit/TestJunOSDriver.py /^ def response(self, **rpc_args):$/;" m class:FakeRPCObject
response test/unit/TestJunOSDriver.py /^ def response(self, non_std_command=None):$/;" m class:FakeConnectionRPCObject
response test/unit/conftest.py /^ def response(self, **rpc_args):$/;" m class:FakeRPCObject
response test/unit/conftest.py /^ def response(self, non_std_command=None):$/;" m class:FakeConnectionRPCObject
rollback napalm_junos/junos.py /^ def rollback(self):$/;" m class:JunOSDriver
setUpClass test/unit/TestJunOSDriver.py /^ def setUpClass(cls):$/;" m class:TestConfigJunOSDriver
setUpClass test/unit/TestJunOSDriver.py /^ def setUpClass(cls):$/;" m class:TestGetterJunOSDriver
set_device_parameters test/unit/conftest.py /^def set_device_parameters(request):$/;" f
setup setup.py /^from setuptools import setup, find_packages$/;" i
setup.py setup.py 1;" F
splitext napalm_junos/utils/junos_views.py /^from os.path import splitext$/;" i
string_parsers napalm_junos/junos.py /^from napalm_base.utils import string_parsers$/;" i
test_getters.py test/unit/test_getters.py 1;" F
traceroute napalm_junos/junos.py /^ def traceroute(self,$/;" m class:JunOSDriver
unicode_literals napalm_junos/constants.py /^from __future__ import unicode_literals$/;" i
unicode_literals napalm_junos/junos.py /^from __future__ import unicode_literals$/;" i
unittest test/unit/TestJunOSDriver.py /^import unittest$/;" i
update_dict napalm_junos/junos.py /^ def update_dict(d, u): # for deep dictionary update$/;" f function:JunOSDriver.get_bgp_config
url setup.py /^ url="https:\/\/github.com\/napalm-automation\/napalm-junos",$/;" v
uuid setup.py /^import uuid$/;" i
version setup.py /^ version="0.6.6",$/;" v
yaml napalm_junos/utils/junos_views.py /^import yaml$/;" i

0 comments on commit 33d8536

Please sign in to comment.