Skip to content

Commit

Permalink
Merge pull request #45 from ipa-lab/decision_making
Browse files Browse the repository at this point in the history
Decision making
  • Loading branch information
andreashappe authored May 13, 2024
2 parents 0485f32 + c19651c commit f79b6af
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ __pycache__/
.idea/
*.sqlite3
*.sqlite3-jounal
*.sqlite
4 changes: 2 additions & 2 deletions capabilities/ssh_run_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class SSHRunCommand(Capability):
def describe(self, name: str = None) -> str:
return f"give a command to be executed on the shell and I will respond with the terminal output when running this command on the linux server. The given command must not require user interaction. Only state the to be executed command. The command should be used for enumeration or privilege escalation."

def __call__(self, command: str) -> Tuple[str, bool]:
def __call__(self, command: str, timeout:int=10) -> Tuple[str, bool]:
got_root = False
sudo_pass = Responder(
pattern=r'\[sudo\] password for ' + self.conn.username + ':',
Expand All @@ -32,7 +32,7 @@ def __call__(self, command: str) -> Tuple[str, bool]:
out = StringIO()

try:
resp = self.conn.run(command, pty=True, warn=True, out_stream=out, watchers=[sudo_pass], timeout=10)
resp = self.conn.run(command, pty=True, warn=True, out_stream=out, watchers=[sudo_pass], timeout=timeout)
except Exception as e:
print("TIMEOUT! Could we have become root?")
out.seek(0)
Expand Down
2 changes: 1 addition & 1 deletion docs/linux_privesc.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Please note, that the last 3-4 features are slowly migrated directly into the fr

This is a simple example run of `wintermute.py` using GPT-4 against a vulnerable VM. More example runs can be seen in [our collection of historic runs](docs/old_runs/old_runs.md).

![Example wintermute run](/docs/old_runs/example_run_gpt4.png)
![Example wintermute run](old_runs/example_run_gpt4.png)

Some things to note:

Expand Down
1 change: 0 additions & 1 deletion usecases/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .usecase import *
from .privesc import *
from .minimal import *
from .web import *
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import dataclass
from rich.panel import Panel
from usecases.usecase import UseCase
from usecases.base import UseCase
from utils import Console, DbStorage
from utils.openai.openai_llm import OpenAIConnection

Expand Down Expand Up @@ -57,4 +57,5 @@ def run(self):
self.log_db.run_was_failure(self._run_id, turn)
self.console.print(Panel("[green]maximum turn number reached", title="Run finished"))

self.teardown()
self.teardown()
return self._got_root
4 changes: 2 additions & 2 deletions usecases/minimal/minimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from capabilities import Capability, SSHRunCommand, SSHTestCredential
from utils import SSHConnection, llm_util
from usecases.usecase import use_case
from usecases.usecase.roundbased import RoundBasedUseCase
from usecases.base import use_case
from usecases.common_patterns import RoundBasedUseCase
from utils.cli_history import SlidingCliHistory

template_dir = pathlib.Path(__file__).parent
Expand Down
3 changes: 2 additions & 1 deletion usecases/privesc/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .privesc import LinuxPrivesc, WindowsPrivesc
from .linux import *
from .windows import *
51 changes: 10 additions & 41 deletions usecases/privesc/privesc.py → usecases/privesc/common.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import abc
import json
import pathlib
from dataclasses import dataclass, field
from typing import Dict

from mako.template import Template
from rich.panel import Panel

from capabilities import Capability, SSHRunCommand, SSHTestCredential, PSExecRunCommand, PSExecTestCredential
from utils import SSHConnection, PSExecConnection, llm_util, ui
from usecases.usecase import use_case, UseCase
from usecases.usecase.roundbased import RoundBasedUseCase
from capabilities import Capability
from utils import llm_util, ui
from usecases.base import UseCase
from usecases.common_patterns import RoundBasedUseCase
from utils.cli_history import SlidingCliHistory

template_dir = pathlib.Path(__file__).parent / "templates"
template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt"))
template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt"))
template_state = Template(filename=str(template_dir / "update_state.txt"))
template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt"))

@dataclass
class Privesc(RoundBasedUseCase, UseCase, abc.ABC):
Expand All @@ -25,26 +25,18 @@ class Privesc(RoundBasedUseCase, UseCase, abc.ABC):
enable_explanation: bool = False
enable_update_state: bool = False
disable_history: bool = False
hints: str = ""
hint: str = ""

_sliding_history: SlidingCliHistory = None
_state: str = ""
_hint: str = None
_capabilities: Dict[str, Capability] = field(default_factory=dict)

def init(self):
super().init()

def setup(self):
if self.hints != "":
try:
with open(self.hints, "r") as hint_file:
hints = json.load(hint_file)
if self.conn.hostname in hints:
self._hint = hints[self.conn.hostname]
self.console.print(f"[bold green]Using the following hint: '{self._hint}'")
except:
self.console.print("[yellow]Was not able to load hint file")
if self.hint != "":
self.console.print(f"[bold green]Using the following hint: '{self.hint}'")

if self.disable_history == False:
self._sliding_history = SlidingCliHistory(self.llm)
Expand Down Expand Up @@ -107,7 +99,7 @@ def get_next_command(self):
if not self.disable_history:
history = self._sliding_history.get_history(self.llm.context_size - llm_util.SAFETY_MARGIN - state_size - template_size)

cmd = self.llm.get_response(template_next_cmd, _capabilities=self._capabilities, history=history, state=self._state, conn=self.conn, system=self.system, update_state=self.enable_update_state, target_user="root", hint=self._hint)
cmd = self.llm.get_response(template_next_cmd, _capabilities=self._capabilities, history=history, state=self._state, conn=self.conn, system=self.system, update_state=self.enable_update_state, target_user="root", hint=self.hint)
cmd.result = llm_util.cmd_output_fixer(cmd.result)
return cmd

Expand All @@ -129,27 +121,4 @@ def update_state(self, cmd, result):

result = self.llm.get_response(template_state, cmd=cmd, resp=result, facts=self._state)
self._state = result.result
return result

@use_case("linux_privesc", "Linux Privilege Escalation")
@dataclass
class LinuxPrivesc(Privesc):
conn: SSHConnection = None
system: str = "linux"

def init(self):
super().init()
self._capabilities["run_command"] = SSHRunCommand(conn=self.conn)
self._capabilities["test_credential"] = SSHTestCredential(conn=self.conn)


@use_case("windows_privesc", "Windows Privilege Escalation")
@dataclass
class WindowsPrivesc(Privesc):
conn: PSExecConnection = None
system: str = "Windows"

def init(self):
super().init()
self._capabilities["run_command"] = PSExecRunCommand(conn=self.conn)
self._capabilities["test_credential"] = PSExecTestCredential(conn=self.conn)
return result
150 changes: 150 additions & 0 deletions usecases/privesc/linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import abc
import json
import pathlib
from dataclasses import dataclass

from mako.template import Template

from capabilities import SSHRunCommand, SSHTestCredential
from usecases.privesc.common import Privesc
from utils import SSHConnection
from usecases.base import use_case, UseCase
from utils.console.console import Console
from utils.db_storage.db_storage import DbStorage
from utils.openai.openai_llm import OpenAIConnection

template_dir = pathlib.Path(__file__).parent / "templates"
template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt"))
template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt"))
template_state = Template(filename=str(template_dir / "update_state.txt"))
template_lse = Template(filename=str(template_dir / "get_hint_from_lse.txt"))

@use_case("linux_privesc_hintfile", "Linux Privilege Escalation using a hints file")
@dataclass
class PrivescWithHintFile(UseCase, abc.ABC):
conn: SSHConnection = None
system: str = ''
enable_explanation: bool = False
enable_update_state: bool = False
disable_history: bool = False
hints: str = ""

# all of these would typically be set by RoundBasedUseCase :-/
# but we need them here so that we can pass them on to the inner
# use-case
log_db: DbStorage = None
console: Console = None
llm: OpenAIConnection = None
tag: str = ""
max_turns: int = 10

def init(self):
super().init()

# simple helper that reads the hints file and returns the hint
# for the current machine (test-case)
def read_hint(self):
if self.hints != "":
try:
with open(self.hints, "r") as hint_file:
hints = json.load(hint_file)
if self.conn.hostname in hints:
return hints[self.conn.hostname]
except:
self.console.print("[yellow]Was not able to load hint file")
else:
self.console.print("[yellow]calling the hintfile use-case without a hint file?")
return ""

def run(self):
# read the hint
hint = self.read_hint()

# call the inner use-case
priv_esc = LinuxPrivesc(
conn=self.conn, # must be set in sub classes
enable_explanation=self.enable_explanation,
disable_history=self.disable_history,
hint=hint,
log_db = self.log_db,
console = self.console,
llm = self.llm,
tag = self.tag,
max_turns = self.max_turns
)

priv_esc.init()
priv_esc.run()

@use_case("linux_privesc_guided", "Linux Privilege Escalation using lse.sh for initial guidance")
@dataclass
class PrivescWithLSE(UseCase, abc.ABC):
conn: SSHConnection = None
system: str = ''
enable_explanation: bool = False
enable_update_state: bool = False
disable_history: bool = False

# all of these would typically be set by RoundBasedUseCase :-/
# but we need them here so that we can pass them on to the inner
# use-case
log_db: DbStorage = None
console: Console = None
llm: OpenAIConnection = None
tag: str = ""
max_turns: int = 10
low_llm: OpenAIConnection = None

def init(self):
super().init()

# simple helper that uses lse.sh to get hints from the system
def read_hint(self):

self.console.print("[green]performing initial enumeration with lse.sh")

run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'"

result, got_root = SSHRunCommand(conn=self.conn)(run_cmd, timeout=120)

self.console.print("[yellow]got the output: " + result)
cmd = self.llm.get_response(template_lse, lse_output=result, number=3)
self.console.print("[yellow]got the cmd: " + cmd.result)

return cmd.result

def run(self):
# read the hint
hint = self.read_hint()

for i in hint.splitlines():
self.console.print("[green]Now using Hint: " + i)

# call the inner use-case
priv_esc = LinuxPrivesc(
conn=self.conn, # must be set in sub classes
enable_explanation=self.enable_explanation,
disable_history=self.disable_history,
hint=i,
log_db = self.log_db,
console = self.console,
llm = self.low_llm,
tag = self.tag + "_hint_" +i,
max_turns = self.max_turns
)

priv_esc.init()
if priv_esc.run():
# we are root! w00t!
return True

@use_case("linux_privesc", "Linux Privilege Escalation")
@dataclass
class LinuxPrivesc(Privesc):
conn: SSHConnection = None
system: str = "linux"

def init(self):
super().init()
self._capabilities["run_command"] = SSHRunCommand(conn=self.conn)
self._capabilities["test_credential"] = SSHTestCredential(conn=self.conn)
7 changes: 7 additions & 0 deletions usecases/privesc/templates/get_hint_from_lse.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Create a list of up to ${number} attack classes that you would try on a linux system (to achieve root level privileges) given the following output:

~~~ bash
${lse_output}
~~~

only output the list of attack classes, for each attack class only output a single short sentence.
18 changes: 18 additions & 0 deletions usecases/privesc/windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import dataclass
from capabilities.psexec_run_command import PSExecRunCommand
from capabilities.psexec_test_credential import PSExecTestCredential
from usecases.base import use_case
from usecases.privesc.common import Privesc
from utils.psexec.psexec import PSExecConnection


@use_case("windows_privesc", "Windows Privilege Escalation")
@dataclass
class WindowsPrivesc(Privesc):
conn: PSExecConnection = None
system: str = "Windows"

def init(self):
super().init()
self._capabilities["run_command"] = PSExecRunCommand(conn=self.conn)
self._capabilities["test_credential"] = PSExecTestCredential(conn=self.conn)
1 change: 0 additions & 1 deletion usecases/usecase/__init__.py

This file was deleted.

4 changes: 2 additions & 2 deletions usecases/web/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from capabilities.record_note import RecordNote
from capabilities.submit_flag import SubmitFlag
from utils import LLMResult, tool_message
from usecases.usecase import use_case
from usecases.usecase.roundbased import RoundBasedUseCase
from usecases.base import use_case
from usecases.common_patterns import RoundBasedUseCase
from utils.configurable import parameter
from utils.openai.openai_lib import OpenAILib

Expand Down
2 changes: 1 addition & 1 deletion wintermute.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import sys

from usecases import use_cases
from usecases.base import use_cases


def main():
Expand Down

0 comments on commit f79b6af

Please sign in to comment.