-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathdatamodel.py
205 lines (167 loc) · 7.19 KB
/
datamodel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# Copyright © 2016-2019 CZ.NIC, z. s. p. o.
#
# This file is part of Yangson.
#
# Yangson is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# Yangson is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with Yangson. If not, see <http://www.gnu.org/licenses/>.
"""Basic access to the Yangson library.
This module implements the following class:
* DataModel: Basic entry point to the YANG data model.
"""
import hashlib
import json
from typing import Optional, Tuple
import xml.etree.ElementTree as ET
from .enumerations import ContentType
from .exceptions import BadYangLibraryData
from .instance import (InstanceRoute, InstanceIdParser, ResourceIdParser,
RootNode)
from .schemadata import SchemaData, SchemaContext
from .schemanode import DataNode, SchemaTreeNode, RawObject, SchemaNode
from .typealiases import DataPath, SchemaPath
class DataModel:
"""Basic user-level entry point to Yangson library."""
@classmethod
def from_file(cls, name: str, mod_path: Tuple[str] = (".",),
description: str = None) -> "DataModel":
"""Initialize the data model from a file with YANG library data.
Args:
name: Name of a file with YANG library data.
mod_path: Tuple of directories where to look for YANG modules.
description: Optional description of the data model.
Returns:
The data model instance.
Raises:
The same exceptions as the class constructor above.
"""
with open(name, encoding="utf-8") as infile:
yltxt = infile.read()
return cls(yltxt, mod_path, description)
def __init__(self, yltxt: str, mod_path: Tuple[str] = (".",),
description: str = None):
"""Initialize the class instance.
Args:
yltxt: JSON text with YANG library data.
mod_path: Tuple of directories where to look for YANG modules.
description: Optional description of the data model.
Raises:
BadYangLibraryData: If YANG library data is invalid.
FeaturePrerequisiteError: If a pre-requisite feature isn't
supported.
MultipleImplementedRevisions: If multiple revisions of an
implemented module are listed in YANG library.
ModuleNotFound: If a YANG module wasn't found in any of the
directories specified in `mod_path`.
"""
try:
self.yang_library = json.loads(yltxt)
except json.JSONDecodeError as e:
raise BadYangLibraryData(str(e)) from None
self.schema_data = SchemaData(self.yang_library, mod_path)
self.schema = SchemaTreeNode(self.schema_data)
self.schema._ctype = ContentType.all
self._build_schema()
self.schema.description = description if description else (
"Data model ID: " +
self.yang_library["ietf-yang-library:modules-state"]
["module-set-id"])
def module_set_id(self) -> str:
"""Compute unique id of YANG modules comprising the data model.
Returns:
String consisting of hexadecimal digits.
"""
fnames = sorted(["@".join(m) for m in self.schema_data.modules])
return hashlib.sha1("".join(fnames).encode("ascii")).hexdigest()
def from_raw(self, robj: RawObject) -> RootNode:
"""Create an instance node from a raw data tree.
Args:
robj: Dictionary representing a raw data tree.
Returns:
Root instance node.
"""
cooked = self.schema.from_raw(robj)
return RootNode(cooked, self.schema, self.schema_data, cooked.timestamp)
def from_xml(self, root: ET.Element) -> RootNode:
"""Create an instance node from a raw data tree.
Args:
robj: Dictionary representing a raw data tree.
Returns:
Root instance node.
"""
cooked = self.schema.from_xml(root)
return RootNode(cooked, self.schema, self.schema_data, cooked.timestamp)
def get_schema_node(self, path: SchemaPath) -> Optional[SchemaNode]:
"""Return the schema node addressed by a schema path.
Args:
path: Schema path.
Returns:
Schema node if found in the schema, or ``None``.
Raises:
InvalidSchemaPath: If the schema path is invalid.
"""
return self.schema.get_schema_descendant(
self.schema_data.path2route(path))
def get_data_node(self, path: DataPath) -> Optional[DataNode]:
"""Return the data node addressed by a data path.
Args:
path: Data path.
Returns:
Data node if found in the schema, or ``None``.
Raises:
InvalidSchemaPath: If the schema path is invalid.
"""
addr = self.schema_data.path2route(path)
node = self.schema
for p in addr:
node = node.get_data_child(*p)
if node is None:
return None
return node
def ascii_tree(self, no_types: bool = False, val_count: bool = False) -> str:
"""Generate ASCII art representation of the schema tree.
Args:
no_types: Suppress output of data type info.
val_count: Show accumulated validation counts.
Returns:
String with the ASCII tree.
"""
return self.schema._ascii_tree("", no_types, val_count)
def clear_val_counters(self):
"""Clear validation counters in the entire schema tree."""
self.schema.clear_val_counters()
def parse_instance_id(self, text: str) -> InstanceRoute:
return InstanceIdParser(text).parse()
def parse_resource_id(self, text: str) -> InstanceRoute:
return ResourceIdParser(text, self.schema).parse()
def schema_digest(self) -> str:
"""Generate schema digest (to be used primarily by clients).
Returns:
Condensed information about the schema in JSON format.
"""
res = self.schema._node_digest()
res["config"] = True
return json.dumps(res)
def _build_schema(self) -> None:
for mid in self.schema_data._module_sequence:
sctx = SchemaContext(
self.schema_data, self.schema_data.namespace(mid), mid)
self.schema._handle_substatements(
self.schema_data.modules[mid].statement, sctx)
for mid in self.schema_data._module_sequence:
sctx = SchemaContext(
self.schema_data, self.schema_data.namespace(mid), mid)
mod = self.schema_data.modules[mid].statement
for aug in mod.find_all("augment"):
self.schema._augment_stmt(aug, sctx)
self.schema._post_process()
self.schema._make_schema_patterns()