Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: postgres #557

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ function initializeDatabase(dataDir: string) {
if (process.env.POSTGRES_URL) {
const db = new PostgresDatabaseAdapter({
connectionString: process.env.POSTGRES_URL,
parseInputs: true,
});
return db;
} else {
Expand Down
20 changes: 11 additions & 9 deletions packages/adapter-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ export class PostgresDatabaseAdapter
while (retryCount < maxRetries) {
try {
const delay = baseDelay * Math.pow(2, retryCount);
elizaLogger.log(`Attempting to reconnect in ${delay}ms...`);
elizaLogger.warn(
`Attempting to reconnect in ${delay}ms...`
);
await new Promise((resolve) => setTimeout(resolve, delay));

// Create new pool with same config
this.pool = new pg.Pool(this.pool.options);
await this.testConnection();

elizaLogger.log("Successfully reconnected to database");
elizaLogger.success("Successfully reconnected to database");
return;
} catch (error) {
retryCount++;
Expand Down Expand Up @@ -116,7 +118,7 @@ export class PostgresDatabaseAdapter
try {
client = await this.pool.connect();
const result = await client.query("SELECT NOW()");
elizaLogger.log(
elizaLogger.success(
"Database connection test successful:",
result.rows[0]
);
Expand Down Expand Up @@ -215,7 +217,7 @@ export class PostgresDatabaseAdapter
if (rows.length === 0) return null;

const account = rows[0];
elizaLogger.log("account", account);
elizaLogger.debug("account", account);
return {
...account,
details:
Expand Down Expand Up @@ -346,7 +348,7 @@ export class PostgresDatabaseAdapter
if (!params.roomId) throw new Error("roomId is required");
let sql = `SELECT * FROM memories WHERE type = $1 AND "agentId" = $2 AND "roomId" = $3`;
const values: any[] = [params.tableName, params.agentId, params.roomId];
let paramCount = 2;
let paramCount = 3; // Updated to start at 3 since we already have 3 parameters

if (params.start) {
paramCount++;
Expand All @@ -366,9 +368,9 @@ export class PostgresDatabaseAdapter

sql += ' ORDER BY "createdAt" DESC';

if (params.count) {
if (params.count && typeof params.count === "number") {
paramCount++;
sql += ` LIMIT $${paramCount}`;
sql += ` LIMIT $${paramCount}::integer`; // Cast to integer
values.push(params.count);
}

Expand Down Expand Up @@ -628,7 +630,7 @@ export class PostgresDatabaseAdapter
);

if (existingParticipant.rows.length > 0) {
elizaLogger.log(
elizaLogger.error(
`Participant with userId ${userId} already exists in room ${roomId}.`
);
return true; // Exit early if the participant already exists
Expand All @@ -643,7 +645,7 @@ export class PostgresDatabaseAdapter
return true;
} catch (error) {
if (error instanceof DatabaseError) {
elizaLogger.log("Error adding participant", error);
elizaLogger.error("Error adding participant", error);
// This is to prevent duplicate participant error in case of a race condition
// Handle unique constraint violation error (code 23505)
if (error.code === "23505") {
Expand Down
178 changes: 129 additions & 49 deletions packages/client-discord/src/voice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import {
NoSubscriberBehavior,
StreamType,
VoiceConnection,
VoiceConnectionStatus,
createAudioPlayer,
createAudioResource,
getVoiceConnection,
joinVoiceChannel,
entersState,
} from "@discordjs/voice";
import {
BaseGuildVoiceChannel,
Expand Down Expand Up @@ -230,6 +232,7 @@ export class VoiceManager extends EventEmitter {
console.error("Error leaving voice channel:", error);
}
}

const connection = joinVoiceChannel({
channelId: channel.id,
guildId: channel.guild.id,
Expand All @@ -238,38 +241,103 @@ export class VoiceManager extends EventEmitter {
selfMute: false,
});

const me = channel.guild.members.me;
if (me?.voice && me.permissions.has("DeafenMembers")) {
await me.voice.setDeaf(false);
await me.voice.setMute(false);
} else {
elizaLogger.log("Bot lacks permission to modify voice state");
}
try {
// Wait for either Ready or Signalling state
await Promise.race([
entersState(connection, VoiceConnectionStatus.Ready, 20_000),
entersState(
connection,
VoiceConnectionStatus.Signalling,
20_000
),
]);

// Log connection success
elizaLogger.log(
`Voice connection established in state: ${connection.state.status}`
);

for (const [, member] of channel.members) {
if (!member.user.bot) {
this.monitorMember(member, channel);
}
}
// Set up ongoing state change monitoring
connection.on("stateChange", async (oldState, newState) => {
elizaLogger.log(
`Voice connection state changed from ${oldState.status} to ${newState.status}`
);

connection.on("error", (error) => {
console.error("Voice connection error:", error);
});
if (newState.status === VoiceConnectionStatus.Disconnected) {
elizaLogger.log("Handling disconnection...");

connection.receiver.speaking.on("start", (userId: string) => {
const user = channel.members.get(userId);
if (!user?.user.bot) {
this.monitorMember(user as GuildMember, channel);
this.streams.get(userId)?.emit("speakingStarted");
try {
// Try to reconnect if disconnected
await Promise.race([
entersState(
connection,
VoiceConnectionStatus.Signalling,
5_000
),
entersState(
connection,
VoiceConnectionStatus.Connecting,
5_000
),
]);
// Seems to be reconnecting to a new channel
elizaLogger.log("Reconnecting to channel...");
} catch (e) {
// Seems to be a real disconnect, destroy and cleanup
elizaLogger.log(
"Disconnection confirmed - cleaning up..." + e
);
connection.destroy();
this.connections.delete(channel.id);
}
} else if (
newState.status === VoiceConnectionStatus.Destroyed
) {
this.connections.delete(channel.id);
} else if (
!this.connections.has(channel.id) &&
(newState.status === VoiceConnectionStatus.Ready ||
newState.status === VoiceConnectionStatus.Signalling)
) {
this.connections.set(channel.id, connection);
}
});

connection.on("error", (error) => {
elizaLogger.log("Voice connection error:", error);
// Don't immediately destroy - let the state change handler deal with it
elizaLogger.log(
"Connection error - will attempt to recover..."
);
});

// Store the connection
this.connections.set(channel.id, connection);

// Continue with voice state modifications
const me = channel.guild.members.me;
if (me?.voice && me.permissions.has("DeafenMembers")) {
try {
await me.voice.setDeaf(false);
await me.voice.setMute(false);
} catch (error) {
elizaLogger.log("Failed to modify voice state:", error);
// Continue even if this fails
}
}
});

connection.receiver.speaking.on("end", async (userId: string) => {
const user = channel.members.get(userId);
if (!user?.user.bot) {
this.streams.get(userId)?.emit("speakingStopped");
// Set up member monitoring
for (const [, member] of channel.members) {
if (!member.user.bot) {
await this.monitorMember(member, channel);
}
}
});
} catch (error) {
elizaLogger.log("Failed to establish voice connection:", error);
connection.destroy();
this.connections.delete(channel.id);
throw error;
}
}

private async monitorMember(
Expand Down Expand Up @@ -780,7 +848,7 @@ export class VoiceManager extends EventEmitter {

audioPlayer.on(
"stateChange",
(oldState: any, newState: { status: string }) => {
(_oldState: any, newState: { status: string }) => {
if (newState.status == "idle") {
const idleTime = Date.now();
console.log(
Expand All @@ -792,34 +860,46 @@ export class VoiceManager extends EventEmitter {
}

async handleJoinChannelCommand(interaction: any) {
const channelId = interaction.options.get("channel")?.value as string;
if (!channelId) {
await interaction.reply("Please provide a voice channel to join.");
return;
}
const guild = interaction.guild;
if (!guild) {
return;
}
const voiceChannel = interaction.guild.channels.cache.find(
(channel: VoiceChannel) =>
channel.id === channelId &&
channel.type === ChannelType.GuildVoice
);
try {
// Defer the reply immediately to prevent interaction timeout
await interaction.deferReply();

const channelId = interaction.options.get("channel")
?.value as string;
if (!channelId) {
await interaction.editReply(
"Please provide a voice channel to join."
);
return;
}

if (!voiceChannel) {
await interaction.reply("Voice channel not found!");
return;
}
const guild = interaction.guild;
if (!guild) {
await interaction.editReply("Could not find guild.");
return;
}

try {
this.joinChannel(voiceChannel as BaseGuildVoiceChannel);
await interaction.reply(
const voiceChannel = interaction.guild.channels.cache.find(
(channel: VoiceChannel) =>
channel.id === channelId &&
channel.type === ChannelType.GuildVoice
);

if (!voiceChannel) {
await interaction.editReply("Voice channel not found!");
return;
}

await this.joinChannel(voiceChannel as BaseGuildVoiceChannel);
await interaction.editReply(
`Joined voice channel: ${voiceChannel.name}`
);
} catch (error) {
console.error("Error joining voice channel:", error);
await interaction.reply("Failed to join the voice channel.");
// Use editReply instead of reply for the error case
await interaction
.editReply("Failed to join the voice channel.")
.catch(console.error);
}
}

Expand Down