Skip to content

Commit

Permalink
refactor: support for semgrep as the code analysis tool
Browse files Browse the repository at this point in the history
  • Loading branch information
art1f1c3R committed Jan 23, 2025
1 parent ecb5c87 commit 5381f4a
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 189 deletions.
4 changes: 4 additions & 0 deletions src/macaron/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,7 @@ class HeuristicAnalyzerValueError(MacaronError):

class LocalArtifactFinderError(MacaronError):
"""Happens when there is an error looking for local artifacts."""


class SourceCodeError(MacaronError):
"""Error for operations on package source code."""
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@
import base64
import binascii
import ipaddress
import json
import logging
import os
import re
from dataclasses import dataclass
import subprocess # nosec
import tempfile
from collections import defaultdict
from typing import Any

import yaml

from macaron.config.defaults import defaults
from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

Expand All @@ -31,21 +35,6 @@
CALLS = "calls"


@dataclass(frozen=True)
class Import:
"""Data class to hold information about extracted import statements.
Name, alias, and module are referring to the following patterns of python import statements:
- [from <module>] import <name> [as <alias>]
"""

name: str
alias: str | None
module: str | None
lineno: int
statement: str


class PyPISourcecodeAnalyzer:
"""This class is used to analyze the source code of python PyPI packages. This analyzer is a work in progress.
Expand All @@ -70,6 +59,7 @@ class PyPISourcecodeAnalyzer:
def __init__(self) -> None:
"""Collect required data for analysing the source code."""
self.suspicious_patterns = self._load_defaults()
self.rule_files: list = []

def _load_defaults(self) -> dict[str, dict[str, list]]:
"""Load the suspicious pattern from suspicious_pattern.yaml.
Expand Down Expand Up @@ -106,7 +96,7 @@ def _load_defaults(self) -> dict[str, dict[str, list]]:
with open(filename, encoding="utf-8") as file:
configured_patterns: dict[str, JsonType] = yaml.safe_load(file)
except FileNotFoundError as file_error:
error_msg = f"Unable to open locate {filename}"
error_msg = f"Unable to locate {filename}"
logger.debug(error_msg)
raise ConfigurationError(error_msg) from file_error
except yaml.YAMLError as yaml_error:
Expand Down Expand Up @@ -162,44 +152,60 @@ def analyze_patterns(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[Heu
HeuristicAnalyzerValueError
if there is no source code available.
"""
analysis_result: dict = {}
analysis_result: defaultdict = defaultdict(list)
semgrep_commands: list[str] = ["semgrep", "scan"]
result: HeuristicResult = HeuristicResult.PASS

source_code = pypi_package_json.package_sourcecode
if not source_code:
error_msg = "Unable to retrieve PyPI package source code"
source_code_path = pypi_package_json.package_sourcecode_path
if not source_code_path:
error_msg = "Unable to retrieve PyPI package source code path"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

for filename, content in source_code.items():
detail_info = {}
self._create_rules()
for rule_file in self.rule_files:
semgrep_commands.extend(["--config", rule_file.name])
semgrep_commands.append(source_code_path)

with tempfile.NamedTemporaryFile(mode="w+", delete=True) as output_json_file:
semgrep_commands.append(f"--json-output={output_json_file.name}")
try:
_ = ast.parse(content)
except (SyntaxError, ValueError) as ast_parse_error:
logger.debug("File %s cannot be parsed as a python file: %s", filename, ast_parse_error)
continue
process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as semgrep_error:
error_msg = (
f"Unable to run semgrep on {source_code_path} with arguments {semgrep_commands}: {semgrep_error}"
)
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg) from semgrep_error

imports = self._extract_imports(content)
import_names = set()
for i in imports:
if i.module:
import_names.add(".".join([i.module, i.name]))
import_names.add(i.name)
if process.returncode != 0:
error_msg = f"Error running semgrep on {source_code_path} with arguments" f" {process.args}"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

for category, patterns in self.suspicious_patterns[IMPORTS].items():
category_info = []
semgrep_output = json.loads(output_json_file.read())

suspicious_imports = set.intersection(import_names, set(patterns))
if suspicious_imports:
category_info = [i for i in imports if i.name in suspicious_imports]
result = HeuristicResult.FAIL
if not semgrep_output:
return result, {}

detail_info[category] = category_info
semgrep_findings = json_extract(semgrep_output, ["results"], list)
if not semgrep_findings:
return result, {}

analysis_result[filename] = {IMPORTS: detail_info}
result = HeuristicResult.FAIL # some semgrep rules were triggered
for finding in semgrep_findings:
category = json_extract(finding, ["check_id"], str)
if not category:
continue

return result, analysis_result
file = json_extract(finding, ["path"], str)
start = json_extract(finding, ["start", "line"], int)
end = json_extract(finding, ["end", "line"], int)
analysis_result[category].append({"file": file, "start": start, "end": end})

self._clear_rules()

return result, dict(analysis_result)

def analyze_dataflow(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the source code of the package for malicious dataflow.
Expand Down Expand Up @@ -253,122 +259,43 @@ def analyze_dataflow(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[Heu

return result, analysis_result

def _extract_imports(self, content: str) -> set[Import]:
try:
return self._extract_imports_from_ast(content)
except SyntaxError:
return self._extract_imports_from_lines(content)
def _create_rules(self) -> None:
rule_list: list[dict[str, Any]] = []
contents: dict = {}

def _extract_imports_from_ast(self, content: str) -> set[Import]:
"""Extract imports from source code using the parsed AST.
if self.rule_files:
self._clear_rules()

Parameters
----------
source_content: str
The source code as a string.
# import rules
for category, patterns in self.suspicious_patterns[IMPORTS].items():
rule: dict[str, Any] = {}
pattern_list: list = []

Returns
-------
set[str]
The set of imports.
rule["id"] = category
rule["severity"] = "ERROR"
rule["languages"] = ["python"]
rule["message"] = f"Detected suspicious imports from the '{category}' category"

Raises
------
SyntaxError
If the code could not be parsed.
"""
imports = set()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(Import(alias.name, alias.asname, None, alias.lineno, ""))
elif isinstance(node, ast.ImportFrom):
module = node.module
if module:
_module = "." * node.level + module
for name in node.names:
imports.add(Import(name.name, name.asname, _module, name.lineno, ""))
return imports

def _extract_imports_from_lines(self, content: str) -> set[Import]:
"""Extract imports from source code using per line pattern matching.
for pattern in patterns:
pattern_list.append({"pattern": f"import {pattern}"})
pattern_list.append({"pattern": f"from {pattern} import $X"})
pattern_list.append({"pattern": f'__import__("{pattern}")'})

Parameters
----------
source_content: str
The source code as a string.
rule["pattern-either"] = pattern_list
rule_list.append(rule)

Returns
-------
set[str]
The list of imports.
"""
alias_pattern = r"\s+as\s+\w+(?:\.{0,1}\w+)*"
# Pattern for module aliases.

module_name = r"\w+(?:\.{0,1}\w+"
# <module_name> as described under pattern_import.

pattern_import = (
r"(?:import\s+)(" + module_name + r")*(?:" + alias_pattern + r")?"
r"(?:(?:\s*,\s*)(?:" + module_name + r")*(?:" + alias_pattern + r")?))*)(?:(?:\s|#).*)?"
)
# Allows for a standard import statement.
# E.g.: import <module_name(s)> <other_text>
# Where <module_name(s)> consists of one or more <module_name>.
# Where <module_name> consists of one or more words (a-z or 0-9 or underscore) separated by periods,
# with an optional alias.
# Where <other_text> allows any character(s) either after a single space or a hash (#).

pattern_from_import = (
r"(?:from\s+)([.]*"
+ module_name
+ r")*)(?:\s+import\s+(\w+(?:\s+as\s+\w+)?(?:(?:\s*,\s*)(?:\w+(?:\s+as\s+\w+)?))*))"
)
# Allows for a from import statement.
# E.g.: from <module_name> import <module_component(s)> <other_text>
# Where <module_name> is as above, but can also be preceded by any number of periods.
# (Note only a single module can be placed here.)
# Where <module_component(s)> consists of one or more <module_component> with optional aliases.
# Where <module_component> is identical to <module_name> except without any periods.
# Where <other_text> requires at least one space followed by one or more word characters, plus
# any other characters following on from that.

combined_pattern = f"^(?:{pattern_import})|(?:{pattern_from_import})$"
# The combined pattern creates two match groups:
# 1 - standard import statement.
# 2 - from import statement module.
# 3 - from import statement module components.

imports = set()
for lineno, line in enumerate(content.splitlines()):
line.strip()
match = re.match(combined_pattern, line)
if not match:
continue
contents = {"rules": rule_list}

with tempfile.NamedTemporaryFile(
"w", prefix=f"{IMPORTS}_", suffix=".yaml", delete=False
) as import_patterns_file:
yaml.dump(contents, import_patterns_file)
self.rule_files.append(import_patterns_file)

if match.group(1):
# Standard import, handle commas and aliases if present.
splits = self._prune_aliased_lines(match.group(1), alias_pattern)
for split in splits:
imports.add(Import(split, None, None, lineno, ""))
elif match.group(2):
# From import
if match.group(3):
splits = self._prune_aliased_lines(match.group(3), alias_pattern)
for split in splits:
imports.add(Import(split, None, match.group(2), lineno, ""))
return imports

def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]:
"""Split the line on commas and remove any aliases from individual parts."""
results = []
splits = text.split(",")
for split in splits:
split = split.strip()
results.append(re.sub(alias_pattern, "", split))
return results
def _clear_rules(self) -> None:
for file in self.rule_files:
file.close()
self.rule_files.clear()


class DataFlowTracer(ast.NodeVisitor):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,11 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:

# Create an AssetLocator object for the PyPI package JSON object.
pypi_package_json = PyPIPackageJsonAsset(
component=ctx.component, pypi_registry=pypi_registry, package_json={}, package_sourcecode={}
component=ctx.component,
pypi_registry=pypi_registry,
package_json={},
package_sourcecode={},
package_sourcecode_path="",
)

pypi_registry_info.metadata.append(pypi_package_json)
Expand Down Expand Up @@ -437,6 +441,8 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
confidence = Confidence.LOW
result_type = CheckResultType.FAILED

pypi_package_json.cleanup_sourcecode()

result_tables.append(
MaliciousMetadataFacts(
result=heuristic_results,
Expand Down
Loading

0 comments on commit 5381f4a

Please sign in to comment.