diff --git a/packages/adapter-pglite/src/index.ts b/packages/adapter-pglite/src/index.ts index 5f65ff7989f..187a5f17b62 100644 --- a/packages/adapter-pglite/src/index.ts +++ b/packages/adapter-pglite/src/index.ts @@ -14,6 +14,7 @@ import { getEmbeddingConfig, DatabaseAdapter, EmbeddingProvider, + RAGKnowledgeItem, } from "@elizaos/core"; import fs from "fs"; import { fileURLToPath } from "url"; @@ -1282,6 +1283,247 @@ export class PGLiteDatabaseAdapter }, "deleteCache")) ?? false ); } + + async getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + }): Promise { + return this.withDatabase(async () => { + try { + let sql = `SELECT * FROM knowledge WHERE ("agentId" = $1 OR "isShared" = true)`; + const queryParams: any[] = [params.agentId]; + let paramCount = 1; + + if (params.id) { + paramCount++; + sql += ` AND id = $${paramCount}`; + queryParams.push(params.id); + } + + if (params.limit) { + paramCount++; + sql += ` LIMIT $${paramCount}`; + queryParams.push(params.limit); + } + + const { rows } = await this.query( + sql, + queryParams + ); + + return rows.map((row) => ({ + id: row.id, + agentId: row.agentId, + content: + typeof row.content === "string" + ? JSON.parse(row.content) + : row.content, + embedding: row.embedding + ? new Float32Array(row.embedding) + : undefined, + createdAt: row.createdAt + ? new Date(row.createdAt).getTime() + : undefined, + })); + } catch (error) { + elizaLogger.error("Error getting knowledge", { + error: + error instanceof Error ? error.message : String(error), + id: params.id, + agentId: params.agentId, + }); + throw new Error( + `Failed to getting knowledge: ${error instanceof Error ? error.message : String(error)}` + ); + } + }, "getKnowledge"); + } + + async searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise { + return this.withDatabase(async () => { + interface KnowledgeSearchRow { + id: UUID; + agentId: UUID; + content: string; + embedding: Buffer | null; + createdAt: string | number; + vector_score: number; + keyword_score: number; + combined_score: number; + } + try { + const cacheKey = `embedding_${params.agentId}_${params.searchText}`; + const cachedResult = await this.getCache({ + key: cacheKey, + agentId: params.agentId, + }); + + if (cachedResult) { + return JSON.parse(cachedResult); + } + + const vectorStr = `[${Array.from(params.embedding).join(",")}]`; + + const sql = ` + WITH vector_scores AS ( + SELECT id, + 1 - (embedding <-> $1::vector) as vector_score + FROM knowledge + WHERE ("agentId" IS NULL AND "isShared" = true) OR "agentId" = $2 + AND embedding IS NOT NULL + ), + keyword_matches AS ( + SELECT id, + CASE + WHEN content->>'text' ILIKE $3 THEN 3.0 + ELSE 1.0 + END * + CASE + WHEN (content->'metadata'->>'isChunk')::boolean = true THEN 1.5 + WHEN (content->'metadata'->>'isMain')::boolean = true THEN 1.2 + ELSE 1.0 + END as keyword_score + FROM knowledge + WHERE ("agentId" IS NULL AND "isShared" = true) OR "agentId" = $2 + ) + SELECT k.*, + v.vector_score, + kw.keyword_score, + (v.vector_score * kw.keyword_score) as combined_score + FROM knowledge k + JOIN vector_scores v ON k.id = v.id + LEFT JOIN keyword_matches kw ON k.id = kw.id + WHERE ("agentId" IS NULL AND "isShared" = true) OR k."agentId" = $2 + AND ( + v.vector_score >= $4 + OR (kw.keyword_score > 1.0 AND v.vector_score >= 0.3) + ) + ORDER BY combined_score DESC + LIMIT $5 + `; + + const { rows } = await this.query(sql, [ + vectorStr, + params.agentId, + `%${params.searchText || ""}%`, + params.match_threshold, + params.match_count, + ]); + + const results = rows.map((row) => ({ + id: row.id, + agentId: row.agentId, + content: + typeof row.content === "string" + ? JSON.parse(row.content) + : row.content, + embedding: row.embedding + ? new Float32Array(row.embedding) + : undefined, + createdAt: row.createdAt + ? new Date(row.createdAt).getTime() + : undefined, + similarity: row.combined_score, + })); + + await this.setCache({ + key: cacheKey, + agentId: params.agentId, + value: JSON.stringify(results), + }); + + return results; + } catch (error) { + elizaLogger.error("Error searching knowledge", { + error: + error instanceof Error ? error.message : String(error), + searchText: params.searchText, + agentId: params.agentId, + }); + throw new Error( + `Failed to search knowledge: ${error instanceof Error ? error.message : String(error)}` + ); + } + }, "searchKnowledge"); + } + + async createKnowledge(knowledge: RAGKnowledgeItem): Promise { + return this.withTransaction(async (tx) => { + try { + const sql = ` + INSERT INTO knowledge ( + id, "agentId", content, embedding, "createdAt", + "isMain", "originalId", "chunkIndex", "isShared" + ) VALUES ($1, $2, $3, $4, to_timestamp($5/1000.0), $6, $7, $8, $9) + ON CONFLICT (id) DO NOTHING + `; + + const metadata = knowledge.content.metadata || {}; + const vectorStr = knowledge.embedding + ? `[${Array.from(knowledge.embedding).join(",")}]` + : null; + + await tx.query(sql, [ + knowledge.id, + metadata.isShared ? null : knowledge.agentId, + knowledge.content, + vectorStr, + knowledge.createdAt || Date.now(), + metadata.isMain || false, + metadata.originalId || null, + metadata.chunkIndex || null, + metadata.isShared || false, + ]); + } catch (error) { + elizaLogger.error("Failed to create knowledge:", { + error: + error instanceof Error ? error.message : String(error), + }); + throw error; + } + }, "createKnowledge"); + } + + async removeKnowledge(id: UUID): Promise { + return await this.withTransaction(async (tx) => { + try { + await tx.query("DELETE FROM knowledge WHERE id = $1", [id]); + } catch (error) { + tx.rollback(); + elizaLogger.error("Error removing knowledge", { + error: + error instanceof Error ? error.message : String(error), + id, + }); + } + }, "removeKnowledge"); + } + + async clearKnowledge(agentId: UUID, shared?: boolean): Promise { + return await this.withTransaction(async (tx) => { + try { + const sql = shared + ? 'DELETE FROM knowledge WHERE ("agentId" = $1 OR "isShared" = true)' + : 'DELETE FROM knowledge WHERE "agentId" = $1'; + await tx.query(sql, [agentId]); + } catch (error) { + tx.rollback(); + elizaLogger.error("Error clearing knowledge", { + error: + error instanceof Error ? error.message : String(error), + agentId, + }); + } + }, "clearKnowledge"); + } } export default PGLiteDatabaseAdapter;