forked from RedHatQE/firewatch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjira_base.py
338 lines (282 loc) · 12.3 KB
/
jira_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import json
from typing import Any
from typing import Optional
from jira import Issue
from jira import JIRA
from jira.exceptions import JIRAError
from simple_logger.logger import get_logger
class Jira:
def __init__(self, jira_config_path: str) -> None:
"""
Constructs the Jira object used for authenticating and interacting with a Jira server.
Args:
jira_config_path (str): The path to the configuration file that hold authentication credentials.
"""
self.logger = get_logger(__name__)
self.proxies: dict[str, str] = {}
with open(jira_config_path) as jira_config_file:
jira_config = json.load(jira_config_file)
self.url = jira_config.get("url")
self.token = jira_config.get("token")
if "proxies" in jira_config:
self.proxies = jira_config.get("proxies")
self.connection = JIRA(
server=self.url,
token_auth=self.token,
proxies=self.proxies,
)
else:
self.connection = JIRA(
server=self.url,
token_auth=self.token,
)
self.logger.info("Jira authentication successful...")
def create_issue(
self,
project: str,
summary: str,
description: str,
issue_type: str,
component: Optional[list[str]] = None,
epic: Optional[str] = None,
file_attachments: Optional[list[str]] = None,
labels: Optional[list[Optional[str]]] = None,
affects_version: Optional[str] = None,
assignee: Optional[str] = None,
priority: Optional[str] = None,
security_level: Optional[str] = None,
close_issue: Optional[bool] = False,
) -> Issue:
"""
Used to create a Jira issue and attach any given files to that issue.
Args:
project (str): The Jira project the issue should be filed under.
summary (str): Title or summary of the issue.
description (str): Description of the issue.
issue_type (str): Issue type (Bug, Task, etc.).
component (Optional[list[str]]): The component or components you'd like the bug to be associated with. Must be a comma deliminated string to specify multiple components. If not supplied, the bug will not have a component.
epic (Optional[str]): The epic ID (PROJECT-8) the new issue should be a part of. If not supplied, the issue will not be associated with an epic.
file_attachments (Optional[list[str]]): An optional list of file paths. Each file in the list will be attached to the issue.
labels (list[Optional[str]]): An optional list of labels to add to the issue.
affects_version (Optional[str]): Value for version affected. Bugs created using this will populate the "Affects Version/s" field in Jira.
assignee (Optional[str]): An optional string for the assignee of an issue. Should be the email address of the user.
priority (Optional[str]): An optional string representing the desired priority of the issue being created.
security_level (Optional[str]): An optional string representing the desired security level of the issue being created.
close_issue (Optional[bool]): Close issue if set to True
Returns:
Issue: A Jira Issue object.
"""
issue_dict = {
"project": {"key": project},
"summary": summary,
"description": description,
"issuetype": {"name": issue_type},
}
labels = [] if not labels else labels
if labels:
# MyPy spits out an odd error here unless ignored.
issue_dict.update({"labels": labels}) # type: ignore
if component:
components = []
for comp in component:
components.append({"name": comp})
# MyPy spits out an odd error here unless ignored.
issue_dict.update({"components": components}) # type: ignore
if affects_version:
issue_dict.update({"versions": [{"name": affects_version}]}) # type: ignore
if priority:
issue_dict.update({"priority": {"name": priority}})
if security_level:
security_id = self._get_security_level_id(
security_level=security_level,
project_key=project,
)
if security_id:
issue_dict.update({"security": {"id": security_id}})
self.logger.info(
"A Jira issue will be reported.",
)
issue = self.connection.create_issue(issue_dict)
self.logger.info(
f"{issue} has been reported to Jira: {self.url}/browse/{issue}",
)
if file_attachments is not None:
for file_path in file_attachments:
self.connection.add_attachment(issue=issue.key, attachment=file_path)
self.logger.info(f"Attachment {file_path} has been uploaded to {issue}")
if epic is not None:
epic_search = self.connection.search_issues(f'issue="{epic}"', maxResults=False)
if len(epic_search) == 1:
epic_id = epic_search[0].id
self.connection.add_issues_to_epic(
epic_id=epic_id,
issue_keys=issue.key,
)
else:
self.logger.error(f"Error finding Jira ID of epic {epic}")
if assignee is not None:
self.assign_issue(user_email=assignee, issue=issue.key)
self.logger.info(f"Issue {issue} has been assigned to user {assignee}")
if close_issue:
self.connection.transition_issue(
issue=issue.key,
transition="closed",
comment="Closed by [firewatch|https://github.com/CSPI-QE/firewatch].",
)
return issue
def search(self, jql_query: str) -> list[Any]:
"""
Performs a Jira JQL query using the Jira connection and returns a list of issues, including all fields.
Args:
jql_query (str): JQL query to run.
Returns:
list[Any]: List of issues that are returned from the query.
"""
return self.connection.search_issues(jql_query, maxResults=False)
def search_issues(self, jql_query: str) -> list[str]:
"""
Performs a Jira JQL query using the Jira connection and returns a list of strings representing issue keys.
Args:
jql_query (str): JQL query to run.
Returns:
list[str]: List of issues that are returned from the query.
"""
issues = []
results = self.search(jql_query)
for issue in results:
issues.append(issue.key)
return issues
def comment(self, issue_id: str, comment: str) -> None:
"""
Comments on the issue_id.
Args:
issue_id (str): Issue to comment on.
comment (str): Comment to add to issue.
"""
self.connection.add_comment(issue_id, comment)
def relate_issues(self, inward_issue: str, outward_issue: str) -> bool:
"""
Used to relate two issues in Jira.
Args:
inward_issue (str): The first issue you'd like to relate to the second issue.
outward_issue (str): The second issue you'd like to relate to the first issue.
Returns:
bool: True if issues related successfully, False otherwise.
"""
try:
self.connection.create_issue_link(
type="relates to",
inwardIssue=inward_issue,
outwardIssue=outward_issue,
)
self.logger.info(
f"Issue {inward_issue} and issue {outward_issue} related successfully",
)
return True
except Exception as ex:
self.logger.error(f"Failure relating {inward_issue} with {outward_issue}")
self.logger.error(ex)
return False
def project_exists(self, project_key: str) -> bool:
"""
Used to validate that the "project_key" exists in the Jira server.
Args:
project_key (str): Jira project key you'd like to check.
Returns:
bool: True if project exists, False otherwise.
"""
try:
project = self.connection.project(project_key)
if project:
self.logger.debug(f"Jira project {project_key} exists...")
return True
else:
self.logger.error(
f"Jira project {project_key} does not exist on {self.url}...",
)
return False
except JIRAError as e:
if e.status_code == 404:
self.logger.error(
f"Jira project {project_key} does not exist on {self.url}...",
)
return False
else:
self.logger.error(f"Error: {e.text}")
return False
def assign_issue(self, user_email: str, issue: str) -> bool:
"""
Assigns a given issue to the user specified.
Args:
user_email (str): A string value representing the email address of use the issue should be assigned to.
issue (str): A string value representing the issue that should be assigned.
Returns:
bool: True if the issue has been assigned successfully, False otherwise.
"""
# Assign the issue
try:
return self.connection.assign_issue(issue=issue, assignee=user_email)
except Exception as ex:
self.logger.error(f"Unable to assign issue {issue} to user {user_email}.")
self.logger.error(ex)
return False
def _get_security_level_id(
self,
security_level: str,
project_key: str,
) -> Optional[str]:
"""
Used to get the security level ID for a given security level.
Args:
security_level (str): The security level you'd like to get the ID for.
project_key (str): The project key the security level is associated with.
Returns:
Optional[str]: The security level ID.
"""
project = self.connection.project(project_key)
security_levels = self.connection.project_issue_security_level_scheme(
project.id,
).levels
for level in security_levels:
if level.name.lower() == security_level.lower():
return level.id
self.logger.error(
f"Security level {security_level} not found in {project_key}, no security level will be applied.",
)
return None
def get_issue_by_id_or_key(self, issue: str) -> Issue:
"""
Get a Jira Issue object from the given issue id or key field value.
Args:
issue (str): The ID or key field value of the Jira Issue object to return.
Returns:
Issue: A Jira Issue object.
"""
return self.connection.issue(id=issue)
def add_labels_to_issue(self, issue_id_or_key: str, labels: list[str]) -> Issue:
"""
Append the given labels to a Jira issue, as identified by the issue's key or ID field value.
If the label already exists on the issue, it is not duplicated.
Returns the value of the modified issue.
Args:
issue_id_or_key (str): The ID or key field value of the Jira Issue object to return.
labels: list[str]: The list of labels to add to the Jira issue.
Returns:
Issue: A Jira Issue object.
"""
issue = self.get_issue_by_id_or_key(issue_id_or_key)
try:
issue.update(update={"labels": [{"add": label} for label in labels]})
# Check if the error is a 400 code and potentially due to user permissions.
except JIRAError as error:
if error.status_code == 400:
self.logger.error(
f"Failed to add labels {labels} to issue {issue_id_or_key}. Error: {error.text}",
)
self.logger.info(
"This error could be caused by missing permissions on the Jira user."
'Please see the "Jira User Permissions" section in the README for more information.',
)
else:
raise
return issue