Skip to content

Commit

Permalink
Merge branch 'main' of github.com:MISP/misp-stix
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Jun 9, 2023
2 parents e7eea03 + ef6aca8 commit 0b54557
Show file tree
Hide file tree
Showing 25 changed files with 1,509 additions and 613 deletions.
154 changes: 111 additions & 43 deletions misp_stix_converter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
__version__ = '2.4.170'
__version__ = '2.4.172'

import argparse
from .misp_stix_mapping import Mapping
from .misp2stix import *
# Helpers
from .misp_stix_converter import (
_from_misp, misp_attribute_collection_to_stix1, misp_collection_to_stix2_0,
misp_collection_to_stix2_1, misp_event_collection_to_stix1, misp_to_stix1,
misp_to_stix2_0, misp_to_stix2_1, stix_1_to_misp, stix_2_to_misp)
_from_misp, misp_attribute_collection_to_stix1, misp_collection_to_stix2,
misp_event_collection_to_stix1, misp_to_stix1, misp_to_stix2,
stix_1_to_misp, stix_2_to_misp)
# STIX 1 special halpers
from .misp_stix_converter import (
_get_campaigns, _get_courses_of_action, _get_events, _get_indicators,
_get_observables, _get_threat_actors, _get_ttps)
_get_observables, _get_threat_actors, _get_ttps, _from_misp)
# STIX 1 footers
from .misp_stix_converter import (
_get_campaigns_footer, _get_courses_of_action_footer, _get_indicators_footer,
Expand All @@ -28,43 +28,51 @@

def main():
parser = argparse.ArgumentParser(description='Convert MISP <-> STIX')

feature_parser = parser.add_mutually_exclusive_group(required=True)
feature_parser.add_argument(
'-e', '--export', action='store_true', help='Export MISP to STIX.'
parser.add_argument(
'--debug', action='store_true', help='Show errors and warnings'
)
feature_parser.add_argument(
'-i', '--import', action='store_true', help='Import STIX to MISP.'

# SUBPARSERS TO SEPARATE THE 2 MAIN FEATURES
subparsers = parser.add_subparsers(
title='Main feature', dest='feature', required=True
)

parser.add_argument(
'-v', '--version', choices=['1.1.1', '1.2', '2.0', '2.1'],
required=True, help='STIX version.'
# EXPORT SUBPARSER
export_parser = subparsers.add_parser(
'export', help='Export MISP to STIX - try '
'`misp_stix_converter export -h` for more help.'
)
parser.add_argument(
export_parser.add_argument(
'-f', '--file', nargs='+', type=Path, required=True,
help='Path to the file(s) to convert.'
)
parser.add_argument(
export_parser.add_argument(
'-v', '--version', choices=['1.1.1', '1.2', '2.0', '2.1'],
required=True, help='STIX specific version.'
)
export_parser.add_argument(
'-s', '--single_output', action='store_true',
help='Produce only one result file (in case of multiple input file).'
)
parser.add_argument(
'-t', '--tmp_files', action='store_true',
help='Store result in file (in case of multiple result files) '
'instead of keeping it in memory only.'
)
parser.add_argument(
'-o', '--output_name', type=Path, help='Output file name'
export_parser.add_argument(
'-m', '--in_memory', action='store_true',
help='Store result in memory (in case of multiple result files) '
'instead of storing it in tmp files.'
)
parser.add_argument(
export_parser.add_argument(
'--output_dir', type=Path,
help='Output path for the conversion results.'
help='Output path - used in the case of multiple input files when the '
'`single_output` argument is not used.'
)

stix1_parser = parser.add_argument_group('STIX 1 specific parameters')
export_parser.add_argument(
'-o', '--output_name', type=Path,
help='Output file name - used in the case of a single input file or '
'when the `single_output` argument is used.'
)
# STIX 1 EXPORT SPECIFIC ARGUMENTS
stix1_parser = export_parser.add_argument_group('STIX 1 specific arguments')
stix1_parser.add_argument(
'--feature', default='event', choices=['attribute', 'event'],
'--level', default='event', choices=['attribute', 'event'],
help='MISP data structure level.'
)
stix1_parser.add_argument(
Expand All @@ -79,22 +87,82 @@ def main():
'-org', default='MISP',
help='Organisation name to be used in the STIX 1 header.'
)
export_parser.set_defaults(func=_misp_to_stix)

# IMPORT SUBPARSER
import_parser = subparsers.add_parser(
'import', help='Import STIX to MISP - try '
'`misp_stix_converter import -h` for more help.'
)
import_parser.add_argument(
'-f', '--file', nargs='+', type=Path, required=True,
help='Path to the file(s) to convert.'
)
import_parser.add_argument(
'-v', '--version', choices=['1', '2'],
required=True, help='STIX major version.'
)
import_parser.add_argument(
'-s', '--single_output', action='store_true',
help='Produce only one MISP event per STIX file'
'(in case of multiple Report, Grouping or Incident objects).'
)
import_parser.add_argument(
'-o', '--output_name', type=Path,
help='Output file name - used in the case of a single input file or '
'when the `single_output` argument is used.'
)
import_parser.add_argument(
'--output_dir', type=Path,
help='Output path - used in the case of multiple input files when the '
'`single_output` argument is not used.'
)
import_parser.add_argument(
'-d', '--distribution', type=int, default=0,
help='Distribution level for the imported MIPS content.'
)
import_parser.add_argument(
'-sg', '--sharing_group', type=int, default=None,
help='Sharing group ID when distribution is 4.'
)
import_parser.add_argument(
'--galaxies_as_tags', action='store_true',
help='Import MISP Galaxies as tag names instead of the standard Galaxy format.'
)
import_parser.set_defaults(func=_stix_to_misp)

stix_args = parser.parse_args()
if len(stix_args.file) > 1 and stix_args.single_output and stix_args.output_dir is None:
stix_args.output_dir = Path(__file__).parents[1] / 'tmp'

results = _misp_to_stix(stix_args) if stix_args.export else _stix_to_misp(stix_args)
if isinstance(results, list):
files = '\n - '.join(str(result) for result in results)
print(
'Successfully processed your '
f"{'files' if len(results) > 1 else 'file'}. Results available in:"
f"\n - {files}"
)
else:
print(
'Successfully processed your '
f"{'files' if len(stix_args.file) > 1 else 'file'}. "
f"Results available in {results}"
)
feature = 'MISP to STIX' if stix_args.feature == 'export' else 'STIX to MISP'
try:
traceback = stix_args.func(stix_args)
for field in ('errors', 'warnings'):
if field in traceback:
messages = '\n - '.join(traceback[field])
print(f'{field.capitalize()} encountered during the '
f'{feature} conversion process:\n - {messages}')
if 'fails' in traceback:
fails = '\n - '.join(traceback['fails'])
print('Failed parsing the following - and the related error '
f'message:\n - {fails}')
if 'results' in traceback:
results = traceback['results']
if isinstance(results, list):
files = '\n - '.join(str(result) for result in results)
print(
'Successfully processed your '
f"{'files' if len(results) > 1 else 'file'}. Results "
f"available in:\n - {files}"
)
else:
print(
'Successfully processed your '
f"{'files' if len(stix_args.file) > 1 else 'file'}. "
f"Results available in {results}"
)
else:
print(f'No result from the {feature} conversion.')
except Exception as exception:
print(f'Breaking exception encountered during the {feature} conversion '
f'process: {exception.__str__()}')
46 changes: 33 additions & 13 deletions misp_stix_converter/misp2stix/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,38 @@
from mixbox import idgen
from mixbox.namespaces import Namespace
from stix.core import STIXHeader, STIXPackage
from typing import Optional
from uuid import uuid4
from typing import Optional, Union
from uuid import uuid4, UUID
from .stix1_mapping import NS_DICT, SCHEMALOC_DICT

json_footer = ']}\n'
_UUID_typing = Union[UUID, str]


def stix1_attributes_framing(namespace: str, orgname: str, return_format: str, version: str) -> tuple:
stix_package = _stix_package(orgname, version)
def stix1_attributes_framing(namespace: str, orgname: str, return_format: str,
version: str) -> tuple:
stix_package = _create_stix_package(orgname, version)
return _stix1_attributes_framing(
namespace, orgname, return_format, stix_package
)


def _stix1_attributes_framing(namespace: str, orgname: str, return_format: str,
stix_package: STIXPackage) -> tuple:
if return_format == 'xml':
namespaces = _handle_namespaces(namespace, orgname)
return _stix_xml_attributes_framing(stix_package, namespaces)
return _stix_json_attributes_framing(stix_package)


def stix1_framing(namespace: str, orgname: str, return_format: str, version: str) -> tuple:
stix_package = _stix_package(orgname, version)
def stix1_framing(namespace: str, orgname: str, return_format: str,
version: str) -> tuple:
stix_package = _create_stix_package(orgname, version)
return _stix1_framing(namespace, orgname, return_format, stix_package)


def _stix1_framing(namespace: str, orgname: str, return_format: str,
stix_package: STIXPackage) -> tuple:
if return_format == 'xml':
namespaces = _handle_namespaces(namespace, orgname)
return _stix_xml_framing(stix_package, namespaces)
Expand All @@ -35,14 +50,14 @@ def stix_xml_separator():
return f" </{header}>\n <{header}>\n"


def stix20_framing(uuid: Optional[str] = None) -> tuple:
def stix20_framing(uuid: Optional[_UUID_typing] = None) -> tuple:
header = '{"type": "bundle", "spec_version": "2.0", "id":'
if uuid is None:
uuid = uuid4()
return f'{header} "bundle--{uuid}", "objects": [', ', ', json_footer


def stix21_framing(uuid: Optional[str] = None) -> tuple:
def stix21_framing(uuid: Optional[_UUID_typing] = None) -> tuple:
header = '{"type": "bundle", "id":'
if uuid is None:
uuid = uuid4()
Expand Down Expand Up @@ -72,7 +87,9 @@ def _stix_json_framing(stix_package: STIXPackage) -> tuple:
return header, ', ', ']}}'


def _stix_package(orgname: str, version: str, uuid: Optional[str] = None) -> STIXPackage:
def _create_stix_package(
orgname: str, version: str, header: Optional[bool] = True,
uuid: Optional[_UUID_typing] = None) -> STIXPackage:
parsed_orgname = re.sub('[\W]+', '', orgname.replace(' ', '_'))
if uuid is None:
uuid = uuid4()
Expand All @@ -81,10 +98,11 @@ def _stix_package(orgname: str, version: str, uuid: Optional[str] = None) -> STI
timestamp=datetime.datetime.now()
)
stix_package.version = version
stix_header = STIXHeader()
stix_header.title = f"Export from {orgname}'s MISP"
stix_header.package_intents = 'Threat Report'
stix_package.stix_header = stix_header
if header:
stix_header = STIXHeader()
stix_header.title = f"Export from {orgname}'s MISP"
stix_header.package_intents = 'Threat Report'
stix_package.stix_header = stix_header
return stix_package


Expand All @@ -99,6 +117,8 @@ def _stix_xml_framing(stix_package: STIXPackage, namespaces: dict) -> tuple:
s_related = "stix:Related_Package"
header = stix_package.to_xml(auto_namespace=False, ns_dict=namespaces, schemaloc_dict=SCHEMALOC_DICT)
header = header.decode()
if header.endswith('/>\n'):
header = f'{header[:-3]}>\n'
header = f"{header.replace(s_stix, '')} <{s_related}s>\n <{s_related}>\n"
footer = f" </{s_related}>\n </{s_related}s>\n{s_stix}"
separator = f" </{s_related}>\n <{s_related}>\n"
Expand Down
9 changes: 5 additions & 4 deletions misp_stix_converter/misp2stix/misp_to_stix1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import json
import re
import socket
from .stix1_mapping import MISPtoSTIX1Mapping
from .exportparser import MISPtoSTIXParser
from .framing import _create_stix_package as _stix_package
from .stix1_mapping import MISPtoSTIX1Mapping
from abc import ABCMeta
from base64 import b64encode
from collections import defaultdict
Expand Down Expand Up @@ -306,9 +307,9 @@ def _parse_file_attribute(self, attribute: dict):
self._handle_attribute(attribute, observable)

def _parse_hash_attribute(self, attribute: dict):
hash = self._parse_hash_value(attribute['type'], attribute['value'])
_hash = self._parse_hash_value(attribute['type'], attribute['value'])
file_object = File()
file_object.add_hash(hash)
file_object.add_hash(_hash)
observable = self._create_observable(file_object, attribute['uuid'], 'File')
self._handle_attribute(attribute, observable)

Expand Down Expand Up @@ -1155,7 +1156,7 @@ def parse_json_content(self, filename):
with open(filename, 'rt', encoding='utf-8') as f:
json_content = json.loads(f.read())
if json_content.get('response'):
package = STIXPackage()
package = _stix_package(self._orgname, self._version, header=False)
for event in json_content['response']:
self.parse_misp_event(event)
package.add_related_package(self._stix_package)
Expand Down
Loading

0 comments on commit 0b54557

Please sign in to comment.