Skip to content

Commit 0cf164b

Browse files
authored
Merge pull request #111 from CoralStack/development
Merge RAG usecase into HackerBuddy thank you @Qsan1
2 parents d2421f7 + 680e50e commit 0cf164b

File tree

18 files changed

+500
-10
lines changed

18 files changed

+500
-10
lines changed

.env.example

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,14 @@ llm.model='gpt-3.5-turbo'
2020
llm.context_size=16385
2121

2222
# how many rounds should this thing go?
23-
max_turns = 20
23+
max_turns = 20
24+
25+
# The following four parameters are only relevant for the usecase rag
26+
# rag_database_folder_name: Name of the folder where the vector store will be saved.
27+
# rag_embedding: The name of the embedding model used. Currently only OpenAI api supported.
28+
# openai_api_key: API key that is used for the embedding model.
29+
# rag_return_token_limit: The upper bound for the RAG output.
30+
rag_database_folder_name = "vetorDB"
31+
rag_embedding = "text-embedding-3-small"
32+
openai_api_key = 'your-openai-key'
33+
rag_return_token_limit = 1000

pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ testing = ['pytest', 'pytest-mock']
6868
dev = [
6969
'ruff',
7070
]
71+
rag-usecase = [
72+
'langchain-community',
73+
'langchain-openai',
74+
'markdown',
75+
'chromadb',
76+
'langchain-chroma',
77+
]
7178

7279
[project.scripts]
7380
wintermute = "hackingBuddyGPT.cli.wintermute:main"

src/hackingBuddyGPT/capabilities/ssh_run_command.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@ class SSHRunCommand(Capability):
1717
timeout: int = 10
1818

1919
def describe(self) -> str:
20-
return "give a command to be executed and I will respond with the terminal output when running this command over SSH on the linux machine. The given command must not require user interaction."
20+
return "give a command to be executed and I will respond with the terminal output when running this command over SSH on the linux machine. The given command must not require user interaction. Do not use quotation marks in front and after your command."
2121

2222
def get_name(self):
2323
return "exec_command"
2424

2525
def __call__(self, command: str) -> Tuple[str, bool]:
2626
if command.startswith(self.get_name()):
2727
cmd_parts = command.split(" ", 1)
28-
command = cmd_parts[1]
28+
if len(cmd_parts) == 1:
29+
command = ""
30+
else:
31+
command = cmd_parts[1]
2932

3033
sudo_pass = Responder(
3134
pattern=r"\[sudo\] password for " + self.conn.username + ":",

src/hackingBuddyGPT/capabilities/ssh_test_credential.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from dataclasses import dataclass
22
from typing import Tuple
3-
3+
from paramiko.ssh_exception import SSHException
44
import paramiko
55

66
from hackingBuddyGPT.utils import SSHConnection
@@ -13,15 +13,29 @@ class SSHTestCredential(Capability):
1313
conn: SSHConnection
1414

1515
def describe(self) -> str:
16-
return "give credentials to be tested"
16+
return "give credentials to be tested."
1717

1818
def get_name(self):
1919
return "test_credential"
2020

21-
def __call__(self, username: str, password: str, keyfilename: str) -> Tuple[str, bool]:
22-
test_conn = self.conn.new_with(username=username, password=password, keyfilename=keyfilename)
21+
def __call__(self, username: str, password: str) -> Tuple[str, bool]:
22+
test_conn = self.conn.new_with(username=username, password=password)
2323
try:
24-
test_conn.init()
24+
for attempt in range(10):
25+
try:
26+
test_conn.init()
27+
break;
28+
except paramiko.ssh_exception.AuthenticationException:
29+
return "Authentication error, credentials are wrong\n", False
30+
except SSHException as e:
31+
if attempt == 9:
32+
raise
33+
print("-------------------------------------------------------")
34+
print(e)
35+
print("Retrying")
36+
print("-------------------------------------------------------")
37+
38+
2539
user = test_conn.run("whoami")[0].strip("\n\r ")
2640
if user == "root":
2741
return "Login as root was successful\n", True

src/hackingBuddyGPT/usecases/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
from .web import *
44
from .web_api_testing import *
55
from .viewer import *
6+
from .rag import *
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# ThesisPrivescPrototype
2+
This usecase is an extension of `usecase/privesc`.
3+
4+
## Setup
5+
### Depdendencies
6+
The needed dependencies can be downloaded with `pip install -e '.[rag-usecase]'`. If you encounter the error `unexpected keyword argument 'proxies'` after trying to start the usecase, try downgrading `httpx` to 0.27.2.
7+
### RAG vector store setup
8+
The code for the vector store setup can be found in `rag_utility.py`. Currently the vectore store uses two sources: `GTFObins` and `hacktricks`. To use RAG, download the markdown files and place them in `rag_storage/GTFObinMarkdownfiles` (`rag_storage/hacktricksMarkdownFiles`). You can download the markdown files either from the respective github repository ([GTFObin](https://github.com/GTFOBins/GTFOBins.github.io/tree/master), [hacktricks](https://github.com/HackTricks-wiki/hacktricks/tree/master/src/linux-hardening/privilege-escalation)) or scrape them from their website ([GTFObin](https://gtfobins.github.io/), [hacktricks](https://book.hacktricks.wiki/en/linux-hardening/privilege-escalation/index.html)).
9+
10+
New data sources can easily be added by adjusting `initiate_rag()` in `rag_utility.py`.
11+
12+
## Components
13+
### Analyze
14+
You can enable this component by adding `--enable_analysis ENABLE_ANALYSIS` to the command.
15+
16+
If enabled, the LLM will be prompted after each iteration and is asked to analyze the most recent output. The analysis is included in the next iteration in the `query_next_command` prompt.
17+
### Chain of Thought (CoT)
18+
You can enable this component by adding `--enable_chain_of_thought ENABLE_CHAIN_OF_THOUGHT` to the command.
19+
20+
If enabled, CoT is used to generate the next command. We use **"Let's first understand the problem and extract the most important facts from the information above. Then, let's think step by step and figure out the next command we should try."**
21+
### Retrieval Augmented Generation (RAG)
22+
You can enable this component by adding `--enable_rag ENABLE_RAG` to the command.
23+
24+
If enabled, after each iteration the LLM is prompted and asked to generate a search query for a vector store. The search query is then used to retrieve relevant documents from the vector store and the information is included in the prompt for the Analyze component (Only works if Analyze is enabled).
25+
### History Compression
26+
You can enable this component by adding `--enable_compressed_history ENABLE_COMPRESSED_HISTORY` to the command.
27+
28+
If enabled, instead of including all commands and their respective output in the prompt, it removes all outputs except the most recent one.
29+
### Structure via Prompt
30+
You can enable this component by adding `--enable_structure_guidance ENABLE_STRUCTURE_GUIDANCE` to the command.
31+
32+
If enabled, an initial set of command recommendations is included in the `query_next_command` prompt.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .linux import *
2+
from .rag_utility import *
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
import datetime
2+
import pathlib
3+
import re
4+
import os
5+
6+
from dataclasses import dataclass, field
7+
from mako.template import Template
8+
from typing import Any, Dict, Optional
9+
from langchain_core.vectorstores import VectorStoreRetriever
10+
11+
from hackingBuddyGPT.capabilities import Capability
12+
from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler
13+
from hackingBuddyGPT.usecases.agents import Agent
14+
from hackingBuddyGPT.usecases.rag import rag_utility as rag_util
15+
from hackingBuddyGPT.utils.logging import log_section, log_conversation
16+
from hackingBuddyGPT.utils import llm_util
17+
from hackingBuddyGPT.utils.cli_history import SlidingCliHistory
18+
19+
template_dir = pathlib.Path(__file__).parent / "templates"
20+
template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt"))
21+
template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt"))
22+
template_chain_of_thought = Template(filename=str(template_dir / "chain_of_thought.txt"))
23+
template_structure_guidance = Template(filename=str(template_dir / "structure_guidance.txt"))
24+
template_rag = Template(filename=str(template_dir / "rag_prompt.txt"))
25+
26+
27+
@dataclass
28+
class ThesisPrivescPrototype(Agent):
29+
system: str = ""
30+
enable_analysis: bool = False
31+
enable_update_state: bool = False
32+
enable_compressed_history: bool = False
33+
disable_history: bool = False
34+
enable_chain_of_thought: bool = False
35+
enable_structure_guidance: bool = False
36+
enable_rag: bool = False
37+
_rag_document_retriever: VectorStoreRetriever = None
38+
hint: str = ""
39+
40+
_sliding_history: SlidingCliHistory = None
41+
_capabilities: Dict[str, Capability] = field(default_factory=dict)
42+
_template_params: Dict[str, Any] = field(default_factory=dict)
43+
_max_history_size: int = 0
44+
_analyze: str = ""
45+
_structure_guidance: str = ""
46+
_chain_of_thought: str = ""
47+
_rag_text: str = ""
48+
49+
def before_run(self):
50+
if self.hint != "":
51+
self.log.status_message(f"[bold green]Using the following hint: '{self.hint}'")
52+
53+
if self.disable_history is False:
54+
self._sliding_history = SlidingCliHistory(self.llm)
55+
56+
if self.enable_rag:
57+
self._rag_document_retriever = rag_util.initiate_rag()
58+
59+
self._template_params = {
60+
"capabilities": self.get_capability_block(),
61+
"system": self.system,
62+
"hint": self.hint,
63+
"conn": self.conn,
64+
"target_user": "root",
65+
'structure_guidance': self.enable_structure_guidance,
66+
'chain_of_thought': self.enable_chain_of_thought
67+
}
68+
69+
if self.enable_structure_guidance:
70+
self._structure_guidance = template_structure_guidance.source
71+
72+
if self.enable_chain_of_thought:
73+
self._chain_of_thought = template_chain_of_thought.source
74+
75+
template_size = self.llm.count_tokens(template_next_cmd.source)
76+
self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - template_size
77+
78+
def perform_round(self, turn: int) -> bool:
79+
# get the next command and run it
80+
cmd, message_id = self.get_next_command()
81+
82+
83+
if self.enable_chain_of_thought:
84+
# command = re.findall("<command>(.*?)</command>", answer.result)
85+
command = re.findall(r"<command>([\s\S]*?)</command>", cmd)
86+
87+
if len(command) > 0:
88+
command = "\n".join(command)
89+
cmd = command
90+
91+
# split if there are multiple commands
92+
commands = self.split_into_multiple_commands(cmd)
93+
94+
cmds, result, got_root = self.run_command(commands, message_id)
95+
96+
97+
# log and output the command and its result
98+
if self._sliding_history:
99+
if self.enable_compressed_history:
100+
self._sliding_history.add_command_only(cmds, result)
101+
else:
102+
self._sliding_history.add_command(cmds, result)
103+
104+
if self.enable_rag:
105+
query = self.get_rag_query(cmds, result)
106+
relevant_documents = self._rag_document_retriever.invoke(query.result)
107+
relevant_information = "".join([d.page_content + "\n" for d in relevant_documents])
108+
self._rag_text = llm_util.trim_result_front(self.llm, int(os.environ['rag_return_token_limit']),
109+
relevant_information)
110+
111+
# analyze the result..
112+
if self.enable_analysis:
113+
self.analyze_result(cmds, result)
114+
115+
116+
# if we got root, we can stop the loop
117+
return got_root
118+
119+
def get_chain_of_thought_size(self) -> int:
120+
if self.enable_chain_of_thought:
121+
return self.llm.count_tokens(self._chain_of_thought)
122+
else:
123+
return 0
124+
125+
def get_structure_guidance_size(self) -> int:
126+
if self.enable_structure_guidance:
127+
return self.llm.count_tokens(self._structure_guidance)
128+
else:
129+
return 0
130+
131+
def get_analyze_size(self) -> int:
132+
if self.enable_analysis:
133+
return self.llm.count_tokens(self._analyze)
134+
else:
135+
return 0
136+
137+
def get_rag_size(self) -> int:
138+
if self.enable_rag:
139+
return self.llm.count_tokens(self._rag_text)
140+
else:
141+
return 0
142+
143+
@log_conversation("Asking LLM for a new command...", start_section=True)
144+
def get_next_command(self) -> tuple[str, int]:
145+
history = ""
146+
if not self.disable_history:
147+
if self.enable_compressed_history:
148+
history = self._sliding_history.get_commands_and_last_output(self._max_history_size - self.get_chain_of_thought_size() - self.get_structure_guidance_size() - self.get_analyze_size())
149+
else:
150+
history = self._sliding_history.get_history(self._max_history_size - self.get_chain_of_thought_size() - self.get_structure_guidance_size() - self.get_analyze_size())
151+
152+
self._template_params.update({
153+
"history": history,
154+
'CoT': self._chain_of_thought,
155+
'analyze': self._analyze,
156+
'guidance': self._structure_guidance
157+
})
158+
159+
cmd = self.llm.get_response(template_next_cmd, **self._template_params)
160+
message_id = self.log.call_response(cmd)
161+
162+
# return llm_util.cmd_output_fixer(cmd.result), message_id
163+
return cmd.result, message_id
164+
165+
166+
@log_conversation("Asking LLM for a search query...", start_section=True)
167+
def get_rag_query(self, cmd, result):
168+
ctx = self.llm.context_size
169+
template_size = self.llm.count_tokens(template_rag.source)
170+
target_size = ctx - llm_util.SAFETY_MARGIN - template_size
171+
result = llm_util.trim_result_front(self.llm, target_size, result)
172+
173+
result = self.llm.get_response(template_rag, cmd=cmd, resp=result)
174+
self.log.call_response(result)
175+
return result
176+
177+
@log_section("Executing that command...")
178+
def run_command(self, cmd, message_id) -> tuple[Optional[str], Optional[str], bool]:
179+
_capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability)
180+
181+
cmds = ""
182+
result = ""
183+
got_root = False
184+
for i, command in enumerate(cmd):
185+
start_time = datetime.datetime.now()
186+
success, *output = parser(command)
187+
if not success:
188+
self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=command, result_text=output[0], duration=0)
189+
return cmds, output[0], False
190+
assert len(output) == 1
191+
capability, cmd_, (result_, got_root_) = output[0]
192+
cmds += cmd_ + "\n"
193+
result += result_ + "\n"
194+
got_root = got_root or got_root_
195+
duration = datetime.datetime.now() - start_time
196+
self.log.add_tool_call(message_id, tool_call_id=i, function_name=capability, arguments=cmd_,
197+
result_text=result_, duration=duration)
198+
199+
cmds = cmds.rstrip()
200+
result = result.rstrip()
201+
return cmds, result, got_root
202+
203+
@log_conversation("Analyze its result...", start_section=True)
204+
def analyze_result(self, cmd, result):
205+
ctx = self.llm.context_size
206+
207+
template_size = self.llm.count_tokens(template_analyze.source)
208+
target_size = ctx - llm_util.SAFETY_MARGIN - template_size - self.get_rag_size()
209+
result = llm_util.trim_result_front(self.llm, target_size, result)
210+
211+
result = self.llm.get_response(template_analyze, cmd=cmd, resp=result, rag_enabled=self.enable_rag, rag_text=self._rag_text, hint=self.hint)
212+
self._analyze = result.result
213+
self.log.call_response(result)
214+
215+
def split_into_multiple_commands(self, response: str):
216+
ret = self.split_with_delimiters(response, ["test_credential", "exec_command"])
217+
218+
# strip trailing newlines
219+
ret = [r.rstrip() for r in ret]
220+
221+
# remove first entry. For some reason its always empty
222+
if len(ret) > 1:
223+
ret = ret[1:]
224+
225+
# combine keywords with their corresponding input
226+
if len(ret) > 1:
227+
ret = [ret[i] + ret[i + 1] for i in range(0, len(ret) - 1, 2)]
228+
return ret
229+
230+
def split_with_delimiters(self, input: str, delimiters):
231+
# Create a regex pattern to match any of the delimiters
232+
regex_pattern = f"({'|'.join(map(re.escape, delimiters))})"
233+
# Use re.split to split the text while keeping the delimiters
234+
return re.split(regex_pattern, input)

0 commit comments

Comments
 (0)