-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathai_helpers.py
155 lines (115 loc) · 4.87 KB
/
ai_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import json
from configparser import ConfigParser
from datetime import datetime
from typing import Union, Tuple
from pathlib import Path
import ffmpeg
from PIL import Image
from db_utils import get_thread_id
from openai import AsyncOpenAI
def get_config():
config = ConfigParser()
config.read("config.ini")
return config
async def new_thread_response(
thread_name: str,
prompt: str = "",
guild_id: str = "",
response_format: Union[str, dict] = "auto",
openai_client: AsyncOpenAI = AsyncOpenAI(),
):
config = get_config()
assistant_id = config.get("OPENAI_ASSISTANTS", thread_name)
thread_id = await get_thread_id(guild_id=guild_id, name=thread_name, openai_client=openai_client)
# add a message to the thread
await openai_client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=prompt,
)
# run the thread with the assistant and monitor the situation
run = await openai_client.beta.threads.runs.create_and_poll(
thread_id=thread_id, assistant_id=assistant_id, response_format=response_format
)
messages = await openai_client.beta.threads.messages.list(thread_id=thread_id)
# get the most recent response from the assistant
response = messages.data[0]
return response
async def generate_speech(
guild_id: str, compartment: str, file_name: str, tts: str, openai_client: AsyncOpenAI = AsyncOpenAI()
) -> Path:
config = get_config()
async with openai_client.audio.speech.with_streaming_response.create(
model=config.get("OPENAI_GENERAL", "speech_model", fallback="tts-1"),
voice=config.get("OPENAI_GENERAL", "voice", fallback="onyx"),
input=tts,
response_format=config.get("OPENAI_GENERAL", "speech_file_format", fallback="wav"),
) as speech:
file_path = content_path(guild_id=guild_id, compartment=compartment, file_name=file_name)
await speech.stream_to_file(file_path)
return file_path
async def speak_and_spell(
thread_name: str,
prompt: str,
guild_id: str,
compartment: str = "default",
openai_client: AsyncOpenAI = AsyncOpenAI(),
) -> Tuple[str, Path]:
response = await new_thread_response(
thread_name=thread_name, prompt=prompt, guild_id=guild_id, openai_client=openai_client
)
tts = response.content[0].text.value
file_path = await generate_speech(
guild_id=guild_id, compartment=compartment, tts=tts, file_name=f"{response.id}.wav", openai_client=openai_client
)
return tts, file_path
async def gs_intro_song(guild_id: str, name: str, openai_client: AsyncOpenAI = AsyncOpenAI()):
config = get_config()
name = f"{name}_theme"
prompt = config.get("PROMPTS", name)
tts, file_path = await speak_and_spell(
thread_name="gs_host", prompt=prompt, guild_id=guild_id, compartment="theme", openai_client=openai_client
)
# ffmpeg work to combine streams
## load both audio files
theme_song = ffmpeg.input("gameshow.mp3").audio
words = ffmpeg.input(file_path).audio
## adjust volumes
theme_song = theme_song.filter("volume", 0.25)
words = words.filter("volume", 3)
## merge and output
dt_string = datetime.now().strftime("%Y%m%d%H%M%S")
ouput_file = content_path(guild_id=guild_id, compartment="theme", file_name=f"intro_{dt_string}.wav")
merged_audio = ffmpeg.filter((theme_song, words), "amix")
out = ffmpeg.output(merged_audio, str(ouput_file)).overwrite_output()
out.run(quiet=True)
return tts, ouput_file
async def get_trivia_question(guild_id: str, openai_client: AsyncOpenAI = AsyncOpenAI()) -> dict:
config = get_config()
trivia_prompt = config.get("PROMPTS", "trivia_game")
response = await new_thread_response(
thread_name="trivia_game",
prompt=trivia_prompt,
guild_id=guild_id,
response_format={"type": "json_object"},
openai_client=openai_client,
)
text_json = response.content[0].text.value
response_dict = json.loads(text_json)
return response_dict
def content_path(guild_id: str, compartment: str, file_name: str):
config = get_config()
ts = datetime.now().strftime(config.get("GENERAL", "session_strftime", fallback="dall-e-2"))
dir_path = Path(f"generated_content/guild_{guild_id}/{ts}/{compartment}")
dir_path.mkdir(parents=True, exist_ok=True)
return dir_path / file_name
def is_square_image(image_path: Path):
with Image.open(image_path) as img:
width, height = img.size
return width == height
def dict_to_ordered_string(data: dict) -> str:
# Sort the dictionary items by value in descending order
sorted_items = sorted(data.items(), key=lambda item: item[1], reverse=True)
# Format the sorted items into a numbered list
formatted_string = "\n".join([f"{i+1}. **{key}**: {value}" for i, (key, value) in enumerate(sorted_items)])
return formatted_string