-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcve_crawler.py
120 lines (101 loc) · 4.7 KB
/
cve_crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import requests
from typing import List
from data_classes import Vulnerability, Advisory # Ensure these are defined appropriately
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CVECrawler:
NVD_BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0/"
CIRCL_BASE_URL = "https://cve.circl.lu/api/cve/"
def __init__(self):
self.headers = {
"Accept": "application/json"
}
def fetch_cve_ids(self, start_date: str, end_date: str, cvss_version: str) -> List[str]:
url = (
f"{self.NVD_BASE_URL}?resultsPerPage=2000&startIndex=0"
f"&pubStartDate={start_date}T00:00:00.000&pubEndDate={end_date}T23:59:59.999"
f"&cvssV{cvss_version}Severity=CRITICAL"
)
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
raise Exception(f"Failed to fetch CVE IDs: {response.status_code}")
data = response.json()
vulnerabilities = data.get("vulnerabilities", [])
return [vuln.get("cve", {}).get("id") for vuln in vulnerabilities if vuln.get("cve")]
def fetch_cve_details_from_circl(self, cve_id: str) -> dict:
"""
Fetch detailed CVE information from the CIRCL API for a given CVE ID.
"""
response = requests.get(f"{self.CIRCL_BASE_URL}{cve_id}", headers=self.headers)
if response.status_code != 200:
print(f"Failed to fetch details for {cve_id} from CIRCL: {response.status_code}")
return {}
return response.json()
def fetch_advisories(self, start_date: str, end_date: str) -> List[Advisory]:
cve_ids_v3 = self.fetch_cve_ids(start_date, end_date, '3')
cve_ids_v4 = self.fetch_cve_ids(start_date, end_date, '4')
all_cve_ids = set(cve_ids_v3 + cve_ids_v4)
advisory_objects = []
for cve_id in all_cve_ids:
cve_details = self.fetch_cve_details_from_circl(cve_id)
if not cve_details:
continue
containers = cve_details.get("containers", {}).get("cna", {})
cve_metadata = cve_details.get("cveMetadata", {})
description = (
containers.get("descriptions", [{}])[0].get("value", "")
)
published_at = cve_metadata.get("datePublished", "")
date_updated = cve_metadata.get("dateUpdated", "")
references = containers.get("references", [])
# Retrieve severity, checking CVSS v4 first, then v3 if v4 is not available
metrics = containers.get("metrics", [{}])[0] # Getting the first metric container
cvss_v4 = metrics.get("cvssV4_0", {})
cvss_v3 = metrics.get("cvssV3_0", {})
severity = cvss_v4.get("baseSeverity", cvss_v3.get("baseSeverity", "CRITICAL"))
# Retrieve CVSS score, checking CVSS v4 first, then v3 if v4 is not available
cvss_score = cvss_v4.get("baseScore", cvss_v3.get("baseScore", 0))
# Log missing fields
missing_fields = []
if not description:
missing_fields.append("description")
if not published_at:
missing_fields.append("published_at")
if not date_updated:
missing_fields.append("date_updated")
if not references:
missing_fields.append("references")
if missing_fields:
logger.info(f"Advisory {cve_id} missing fields: {', '.join(missing_fields)}")
vulnerabilities = [
Vulnerability(
package_name=cve_id,
ecosystem="UNKNOWN", # Adjust to your specific context
first_patched_version=None,
vulnerable_version_range=None,
)
]
advisory = Advisory(
ghsa_id=None,
cve_id=cve_id,
url=references[0]["url"] if references else "",
html_url=references[0]["url"] if references else "",
summary=description,
description=description,
severity=severity,
published_at=published_at,
updated_at=date_updated,
vulnerabilities=vulnerabilities,
references=[r["url"] for r in references],
assigner_name=cve_metadata.get("assignerShortName", ""),
cvss_scrore=cvss_score
)
advisory_objects.append(advisory)
return advisory_objects
# Usage example
# crawler = CVECrawler()
# advisories = crawler.fetch_advisories("2025-01-14", "2025-01-20")
# for advisory in advisories:
# print(advisory)