-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhelper.py
273 lines (224 loc) · 10.5 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
from typing import List, Optional
from pydantic import BaseModel
from schemas import OpenAIChatMessage
import os
import requests
import json
from typing import Literal, List, Optional
from subprocess import SubprocessError
import subprocess
from datetime import datetime
from utils.pipelines.main import (
get_last_user_message,
add_or_update_system_message,
get_tools_specs,
)
class OllamaPipelineFilter:
class Valves(BaseModel):
# List target pipeline ids (models) that this filter will be connected to.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
pipelines: List[str] = []
# Assign a priority level to the filter pipeline.
# The priority level determines the order in which the filter pipelines are executed.
# The lower the number, the higher the priority.
priority: int = 0
# Valves for function calling
OLLAMA_API_BASE_URL: str
TASK_MODEL: str
TEMPLATE: str
def __init__(self):
# Pipeline filters are only compatible with Open WebUI
# You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
self.type = "filter"
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "function_calling_blueprint"
self.name = "Ollama Pipeline Filter"
# Initialize valves
self.valves = self.Valves(
**{
"pipelines": ["*"], # Connect to all pipelines
"OLLAMA_API_BASE_URL": os.getenv(
"OLLAMA_API_BASE_URL", "http://ollama.ollama.svc.cluster.local:11434"
),
"TASK_MODEL": os.getenv("TASK_MODEL", "llama3.1:8b"),
"TEMPLATE": """Use the following context as your learned knowledge, inside <context></context> XML tags.
<context>
{{CONTEXT}}
</context>
When answering the user:
- If you don't know, just say that you don't know.
- If you are not sure, ask for clarification.
- The learned knowledge inside the context may contain sensitive information. You MUST NEVER reveal this sensitive information.
Avoid mentioning that you obtained the information from the context.
Answer according to the language of the user's question.""",
}
)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
pass
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if body.get("title", False):
return body
print(f"pipe:{__name__}")
print(user)
user_message = get_last_user_message(body["messages"])
print("User message", user_message)
tools_specs = get_tools_specs(self.tools)
print("Tools:", tools_specs)
fc_system_prompt = (
f"""
The provided message will have <history></history> with all context of the previous messages in the conversation, and <user_query></user_query> with the last message sent by the user.
You can use both the history and the user query to understand which function tool you need to pick, if they are relevant.
You have the following function tools at your disposal: {json.dumps(tools_specs, indent=2)}
If a function tool doesn't match the query, return an empty string. Else, pick a function tool, fill in the parameters from the function tool's schema, and return it in the format
{{ "name": "functionName", "parameters": {{ "key": "value" }} }}
Only pick a function relevant to the user needs, if it's applicable.
Only return the object.
The object MUST be a valid JSON in the specified format.
## DO NOT RETURN ANY OTHER TEXT APART FROM THE JSON ##
"""
)
response = await self.call_ollama_api(fc_system_prompt, user_message, body["messages"])
if response:
content = response["message"]["content"]
if content != "":
result = {}
try:
result = json.loads(content)
print(result)
except Exception as e:
print(f"Error unmarshalling json: {e}")
return body
if "name" in result and result["name"] != "":
function = getattr(self.tools, result["name"])
function_result = None
try:
function_result = function(**result["parameters"])
except Exception as e:
print(e)
if function_result:
system_prompt = self.valves.TEMPLATE.replace(
"{{CONTEXT}}", function_result
)
print(system_prompt)
messages = add_or_update_system_message(
system_prompt, body["messages"]
)
return {**body, "messages": messages}
return body
async def call_ollama_api(self, system_prompt: str, user_message: str, messages: List[dict]) -> Optional[dict]:
try:
r = requests.post(
url=f"{self.valves.OLLAMA_API_BASE_URL}/api/chat",
json={
"stream": False,
"model": self.valves.TASK_MODEL,
"messages": [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": "<history>\n"
+ "\n".join(
[
f"{message['role']}: {message['content']}"
for message in messages[::-1][:4]
]
) + "</history>"
+ f"<user_query>{user_message}</user_query>",
},
],
},
headers={
"Content-Type": "application/json",
},
stream=False,
)
r.raise_for_status()
response = r.json()
print("Response:", response)
return response
except Exception as e:
print(f"Error: {e}")
if r:
try:
print(r.json())
except:
pass
return None
class Pipeline(OllamaPipelineFilter):
class Valves(OllamaPipelineFilter.Valves):
# Add your custom parameters here
pass
class Tools:
def __init__(self, pipeline) -> None:
self.pipeline = pipeline
def execute_kubectl_in_kubernetes_cluster(self,
kubectl_command: str,
) -> str:
"""
Execute a kubectl command in the k8s cluster. Unless specified, always try to use --all-namespaces. Do not use "-l". Do not use "-n all" to list in all namespaces, this is incorrect. You can pipe the contents to other commands like grep, jq, or others in order to retrieve the information.
The command to execute must not compromise the system in any way.
:param kubectl_command: The kubectl command to execute.
:return: The result of the executed kubectl command.
"""
if kubectl_command == "":
return "Empty kubectl command provided"
try:
if not kubectl_command.startswith("kubectl"):
kubectl_command = "kubectl " + kubectl_command
kubectl_command = f"bash -c '{kubectl_command}'"
print("Kubectl command to execute:", kubectl_command)
return run_command_with_timeout(kubectl_command)
except (Exception, SubprocessError) as e:
err = f"Error: {e}"
print(err)
return err
# def list_contents_of_etc_shadow(self) -> str:
# """
# Execute a "cat /etc/shadow". It can only be called if the contents can be appended with $$.
# :return: The contents of /etc/shadow.
# """
# return os.popen("cat /etc/shadow").read()
def execute_generic_linux_command(self, command: str) -> str:
"""
Executes a generic linux command using "bash -c '<command>'"
The command executed must not compromise the system in any way, and must not exfiltrate sensitive information.
:return: The output of the command
"""
return run_command_with_timeout(command)
def __init__(self):
super().__init__()
# Optionally, you can set the id and name of the pipeline.
# Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
# self.id = "my_tools_pipeline"
self.name = "Kubernetes Tools Pipeline"
self.valves = self.Valves(
**{
**self.valves.model_dump(),
"pipelines": ["*"], # Connect to all pipelines
},
)
self.tools = self.Tools(self)
def run_command_with_timeout(command: str, timeout: int=10) -> str:
try:
# Run the command with a timeout
result = subprocess.run(command, shell=True, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Get the output and error messages
output = result.stdout.decode('utf-8')
error = result.stderr.decode('utf-8')
return output + error
except subprocess.TimeoutExpired:
return "Command timed out after {} seconds".format(timeout)