Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:MISP/misp-stix
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Jan 30, 2023
2 parents 19908ea + 25dbc6b commit 25d5c6b
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 122 deletions.
8 changes: 8 additions & 0 deletions misp_stix_converter/stix2misp/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@ class ObjectTypeLoadingError(STIXtoMISPError):
pass


class SynonymsResourceJSONError(STIXtoMISPError):
pass


class UnavailableGalaxyResourcesError(STIXtoMISPError):
pass


class UnavailableSynonymsResourceError(STIXtoMISPError):
pass


class UndefinedIndicatorError(STIXtoMISPError):
pass

Expand Down
150 changes: 74 additions & 76 deletions misp_stix_converter/stix2misp/external_stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .stix2_to_misp import (
STIX2toMISPParser, _COURSE_OF_ACTION_TYPING, _GALAXY_OBJECTS_TYPING,
_IDENTITY_TYPING, _SDO_TYPING, _VULNERABILITY_TYPING)
from pathlib import Path
from pymisp import MISPAttribute, MISPGalaxy, MISPObject
from stix2.v20.sdo import (
AttackPattern as AttackPattern_v20, CourseOfAction as CourseOfAction_v20,
Expand All @@ -35,8 +36,8 @@


class ExternalSTIX2toMISPParser(STIX2toMISPParser):
def __init__(self):
super().__init__()
def __init__(self, galaxies_as_tags: Optional[bool] = False):
super().__init__(galaxies_as_tags)
self._mapping = ExternalSTIX2toMISPMapping()

################################################################################
Expand Down Expand Up @@ -156,15 +157,9 @@ def _parse_attack_pattern(self, attack_pattern_ref: str):
if attack_pattern_ref in self._clusters:
self._clusters[attack_pattern_ref]['used'][self.misp_event.uuid] = False
else:
attack_pattern = self._get_stix_object(attack_pattern_ref)
self._clusters[attack_pattern.id] = {
'cluster': self._parse_attack_pattern_cluster(attack_pattern),
'used': {self.misp_event.uuid: False}
}
if 'attack-pattern' not in self._galaxies:
self._galaxies['attack-pattern'] = self._create_galaxy_args(
attack_pattern
)
self._clusters[attack_pattern_ref] = self._parse_galaxy(
attack_pattern_ref
)

def _parse_campaign(self, campaign_ref: str):
"""
Expand All @@ -177,13 +172,7 @@ def _parse_campaign(self, campaign_ref: str):
if campaign_ref in self._clusters:
self._clusters[campaign_ref]['used'][self.misp_event.uuid] = False
else:
campaign = self._get_stix_object(campaign_ref)
self._clusters[campaign.id] = {
'cluster': self._parse_campaign_cluster(campaign),
'used': {self.misp_event.uuid: False}
}
if 'campaign' not in self._galaxies:
self._galaxies['campaign'] = self._create_galaxy_args(campaign)
self._clusters[campaign_ref] = self._parse_galaxy(campaign_ref)

def _parse_course_of_action(self, course_of_action_ref: str):
"""
Expand All @@ -196,15 +185,9 @@ def _parse_course_of_action(self, course_of_action_ref: str):
if course_of_action_ref in self._clusters:
self._clusters[course_of_action_ref]['used'][self.misp_event.uuid] = False
else:
course_of_action = self._get_stix_object(course_of_action_ref)
self._clusters[course_of_action.id] = {
'cluster': self._parse_course_of_action_cluster(course_of_action),
'used': {self.misp_event.uuid: False}
}
if 'course-of-action' not in self._galaxies:
self._galaxies['course-of-action'] = self._create_galaxy_args(
course_of_action
)
self._clusters[course_of_action_ref] = self._parse_galaxy(
course_of_action_ref
)

def _parse_course_of_action_object(self, course_of_action: _COURSE_OF_ACTION_TYPING):
"""
Expand Down Expand Up @@ -235,6 +218,31 @@ def _parse_course_of_action_object(self, course_of_action: _COURSE_OF_ACTION_TYP
'used': {self.misp_event.uuid: False}
}

def _parse_galaxy(self, object_ref: str) -> dict:
object_type = object_ref.split("--")[0]
stix_object = self._get_stix_object(object_ref)
name = stix_object.name
if self.galaxies_as_tags:
tag_names = self._check_existing_galaxy_name(name)
if tag_names is None:
tag_names = [
f'misp-galaxy:{object_type}="{name}"'
]
return {
'tag_names': tag_names,
'used': {self.misp_event.uuid: False}
}
if object_type not in self._galaxies:
self._galaxies[object_type] = self._create_galaxy_args(stix_object)
return {
'cluster': getattr(
self, f"_parse_{object_type.replace('-', '_')}_cluster"
)(
stix_object
),
'used': {self.misp_event.uuid: False}
}

def _parse_identity(self, identity_ref: str):
"""
Identity object parsing function.
Expand Down Expand Up @@ -309,15 +317,9 @@ def _parse_intrusion_set(self, intrusion_set_ref: str):
if intrusion_set_ref in self._clusters:
self._clusters[intrusion_set_ref]['used'][self.misp_event.uuid] = False
else:
intrusion_set = self._get_stix_object(intrusion_set_ref)
self._clusters[intrusion_set.id] = {
'cluster': self._parse_intrusion_set_cluster(intrusion_set),
'used': {self.misp_event.uuid: False}
}
if 'intrusion-set' not in self._galaxies:
self._galaxies['intrusion-set'] = self._create_galaxy_args(
intrusion_set
)
self._clusters[intrusion_set_ref] = self._parse_galaxy(
intrusion_set_ref
)

def _parse_location(self, location_ref: str):
"""
Expand All @@ -340,14 +342,27 @@ def _parse_location(self, location_ref: str):
)
else:
feature = 'region' if not hasattr(location, 'country') else 'country'
self._clusters[location.id] = {
'cluster': getattr(self, f'_parse_{feature}_cluster')(location),
'used': {self.misp_event.uuid: False}
}
if feature not in self._galaxies:
self._galaxies[feature] = self._create_galaxy_args(
location, galaxy_type=feature
)
if self.galaxies_as_tags:
tag_names = self._check_existing_galaxy_name(location)
if tag_names is None:
tag_names = [
f'misp-galaxy:{feature}="{location.name}"'
]
self._clusters[location.id] = {
'tag_names': tag_names,
'used': {self.misp_event.uuid: False}
}
else:
self._clusters[location.id] = {
'cluster': getattr(self, f'_parse_{feature}_cluster')(
location
),
'used': {self.misp_event.uuid: False}
}
if feature not in self._galaxies:
self._galaxies[feature] = self._create_galaxy_args(
location, galaxy_type=feature
)

def _parse_malware(self, malware_ref: str):
"""
Expand All @@ -360,15 +375,7 @@ def _parse_malware(self, malware_ref: str):
if malware_ref in self._clusters:
self._clusters[malware_ref]['used'][self.misp_event.uuid] = False
else:
malware = self._get_stix_object(malware_ref)
self._clusters[malware.id] = {
'cluster': self._parse_malware_cluster(malware),
'used': {self.misp_event.uuid: False}
}
if 'malware' not in self._galaxies:
self._galaxies['malware'] = self._create_galaxy_args(
malware
)
self._clusters[malware_ref] = self._parse_galaxy(malware_ref)

def _parse_observed_data_v20(self, observed_data: ObservedData_v20):
"""
Expand Down Expand Up @@ -423,15 +430,9 @@ def _parse_threat_actor(self, threat_actor_ref: str):
if threat_actor_ref in self._clusters:
self._clusters[threat_actor_ref]['used'][self.misp_event.uuid] = False
else:
threat_actor = self._get_stix_object(threat_actor_ref)
self._clusters[threat_actor.id] = {
'cluster': self._parse_threat_actor_cluster(threat_actor),
'used': {self.misp_event.uuid: False}
}
if 'threat-actor' not in self._galaxies:
self._galaxies['threat-actor'] = self._create_galaxy_args(
threat_actor
)
self._clusters[threat_actor_ref] = self._parse_galaxy(
threat_actor_ref
)

def _parse_tool(self, tool_ref: str):
"""
Expand All @@ -444,13 +445,7 @@ def _parse_tool(self, tool_ref: str):
if tool_ref in self._clusters:
self._clusters[tool_ref]['used'][self.misp_event.uuid] = False
else:
tool = self._get_stix_object(tool_ref)
self._clusters[tool.id] = {
'cluster': self._parse_tool_cluster(tool),
'used': {self.misp_event.uuid: False}
}
if 'tool' not in self._galaxies:
self._galaxies['tool'] = self._create_galaxy_args(tool)
self._clusters[tool_ref] = self._parse_galaxy(tool_ref)

def _parse_vulnerability(self, vulnerability_ref: str):
"""
Expand All @@ -463,13 +458,9 @@ def _parse_vulnerability(self, vulnerability_ref: str):
if vulnerability_ref in self._clusters:
self._clusters[vulnerability_ref]['used'][self.misp_event.uuid] = False
else:
vulnerability = self._get_stix_object(vulnerability_ref)
self._clusters[vulnerability.id] = {
'cluster': self._parse_vulnerability_cluster(vulnerability),
'used': {self.misp_event.uuid: False}
}
if 'vulnerability' not in self._galaxies:
self._galaxies['vulnerability'] = self._create_galaxy_args(vulnerability)
self._clusters[vulnerability_ref] = self._parse_galaxy(
vulnerability_ref
)

def _parse_vulnerability_object(self, vulnerability: _VULNERABILITY_TYPING):
"""
Expand Down Expand Up @@ -924,6 +915,13 @@ def _create_attribute_dict(self, stix_object: _SDO_TYPING) -> dict:
# UTILITY FUNCTIONS. #
################################################################################

def _check_existing_galaxy_name(self, stix_object_name: str) -> Union[list, None]:
if stix_object_name in self.synonyms_mapping:
return self.synonyms_mapping[stix_object_name]
for name, tag_names in self.synonyms_mapping.items():
if stix_object_name in name:
return tag_names

@staticmethod
def _extract_types_from_observable_objects(observable_objects: dict) -> list:
return sorted({observable.type for observable in observable_objects.values()})
Expand Down
97 changes: 95 additions & 2 deletions misp_stix_converter/stix2misp/importparser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# -*- coding: utf-8 -*-
#!/usr/bin/env python3

import json
import subprocess
import traceback
from .exceptions import (SynonymsResourceJSONError, UnavailableGalaxyResourcesError,
UnavailableSynonymsResourceError)
from collections import defaultdict
from pathlib import Path
from pymisp import MISPObject
from stix2.v20.sdo import Indicator as Indicator_v20
from stix2.v21.sdo import Indicator as Indicator_v21
Expand All @@ -13,28 +18,53 @@
Indicator_v20,
Indicator_v21
]
_ROOT_PATH = Path(__file__).parents[1].resolve()

_RFC_VERSIONS = (1, 3, 4, 5)
_UUIDv4 = UUID('76beed5f-7251-457e-8c2a-b45f7b589d3d')


class STIXtoMISPParser:
def __init__(self):
def __init__(self, galaxies_as_tags: bool):
self._identifier: str
self._galaxies: dict = {}
self._clusters: dict = {}
if galaxies_as_tags:
self.__synonyms_path = _ROOT_PATH / 'data' / 'synonymsToTagNames.json'
else:
self._galaxies: dict = {}
self.__galaxies_as_tags = galaxies_as_tags
self.__replacement_uuids: dict = {}
self.__errors: defaultdict = defaultdict(set)
self.__warnings: defaultdict = defaultdict(set)

################################################################################
# PROPERTIES #
################################################################################

@property
def errors(self) -> dict:
return self.__errors

@property
def galaxies_as_tags(self) -> bool:
return self.__galaxies_as_tags

@property
def replacement_uuids(self) -> dict:
return self.__replacement_uuids

@property
def synonyms_mapping(self) -> dict:
try:
return self.__synonyms_mapping
except AttributeError:
self.__get_synonyms_mapping()
return self.__synonyms_mapping

@property
def synonyms_path(self) -> Path:
return self.__synonyms_path

@property
def warnings(self) -> defaultdict:
return self.__warnings
Expand Down Expand Up @@ -168,6 +198,69 @@ def _vulnerability_error(self, vulnerability_id: str, exception: Exception):
message = f"Error with the Vulnerability object with id {vulnerability_id}: {tb}"
self.__errors[self._identifier].add(message)

################################################################################
# SYNONYMS TO GALAXY TAG NAMES MAPPING HANDLING FUNCTIONS. #
################################################################################

def __galaxies_up_to_date(self) -> bool:
fingerprint_path = _ROOT_PATH / 'data' / 'synonymsToTagNames.fingerprint'
if not fingerprint_path.exists():
return False
latest_fingerprint = self.__get_misp_galaxy_fingerprint()
if latest_fingerprint is None:
return False
with open(fingerprint_path, 'rt', encoding='utf-8') as f:
fingerprint = f.read()
return fingerprint == latest_fingerprint

def __generate_synonyms_mapping(self):
data_path = _ROOT_PATH / 'data' / 'misp-galaxy' / 'clusters'
if not data_path.exists():
raise UnavailableGalaxyResourcesError(data_path)
synonyms_mapping = defaultdict(list)
for filename in data_path.glob('*.json'):
with open(filename, 'rt', encoding='utf-8') as f:
cluster_definition = json.loads(f.read())
cluster_type = f"misp-galaxy:{cluster_definition['type']}"
for cluster in cluster_definition['values']:
value = cluster['value']
tag_name = f'{cluster_type}="{value}"'
synonyms_mapping[value].append(tag_name)
if cluster.get('meta') is not None and cluster['meta'].get('synonyms') is not None:
for synonym in cluster['meta']['synonyms']:
synonyms_mapping[synonym].append(tag_name)
with open(self.synonyms_path, 'wt', encoding='utf-8') as f:
f.write(json.dumps(synonyms_mapping))
latest_fingerprint = self.__get_misp_galaxy_fingerprint()
if latest_fingerprint is not None:
fingerprint_path = _ROOT_PATH / 'data' / 'synonymsToTagNames.fingerprint'
with open(fingerprint_path, 'wt', encoding='utf-8') as f:
f.write(latest_fingerprint)

@staticmethod
def __get_misp_galaxy_fingerprint():
galaxy_path = _ROOT_PATH / 'data' / 'misp-galaxy'
status = subprocess.Popen(
[
'git',
'submodule',
'status',
galaxy_path
],
stdout=subprocess.PIPE
)
stdout = status.communicate()[0]
try:
return stdout.decode().split(' ')[1]
except IndexError:
return None

def __get_synonyms_mapping(self):
if not self.synonyms_path.exists() or not self.__galaxies_up_to_date():
self.__generate_synonyms_mapping()
with open(self.synonyms_path, 'rt', encoding='utf-8') as f:
self.__synonyms_mapping = json.loads(f.read())

################################################################################
# UUID SANITATION HANDLING FUNCTIONS #
################################################################################
Expand Down
Loading

0 comments on commit 25d5c6b

Please sign in to comment.