diff --git a/web/app/strategy/[id]/page.tsx b/web/app/r/[id]/page.tsx
similarity index 88%
rename from web/app/strategy/[id]/page.tsx
rename to web/app/r/[id]/page.tsx
index 1184c5b..d25a590 100644
--- a/web/app/strategy/[id]/page.tsx
+++ b/web/app/r/[id]/page.tsx
@@ -7,7 +7,6 @@ export default async function StrategyPage({
}: {
params: { id: string };
}) {
- const workerId = params.id;
const supabase = createSupabaseServerClient();
// // Fetch the runs for this worker
@@ -25,13 +24,13 @@ export default async function StrategyPage({
)
`
)
- .eq("worker_id", workerId)
+ .eq("id", params.id)
.order("created_at", { ascending: false })
.single();
if (runs.error || !runs.data) {
console.error(runs.error);
- throw Error(`Runs with worker_id ${workerId} not found.`);
+ throw Error(`Runs with id ${params.id} not found.`);
}
const data = runs.data.strategy_entries as unknown as StrategyWithProjects;
diff --git a/web/app/r/[id]/progress/page.tsx b/web/app/r/[id]/progress/page.tsx
new file mode 100644
index 0000000..c4e5cbc
--- /dev/null
+++ b/web/app/r/[id]/progress/page.tsx
@@ -0,0 +1,40 @@
+import Logs from "@/components/Logs";
+import TextField from "@/components/TextField";
+import { createSupabaseServerClient } from "@/utils/supabase-server";
+
+async function PromptField(props: { runId: string }) {
+ const supabase = createSupabaseServerClient()
+
+ const { data: run } = await supabase.from('runs').select(`
+ id,
+ prompt
+ `).eq("id", props.runId).single()
+
+ if (!run) {
+ throw new Error(`Run with ID '${props.runId}' not found`)
+ }
+
+ return
+}
+
+export default function ProgressPage(props: {
+ params: {
+ id: string
+ }
+}) {
+ return (
+
+ )
+}
\ No newline at end of file
diff --git a/web/components/LoadingCircle.tsx b/web/components/LoadingCircle.tsx
index 86ce915..c36a717 100644
--- a/web/components/LoadingCircle.tsx
+++ b/web/components/LoadingCircle.tsx
@@ -3,9 +3,10 @@ import clsx from "clsx";
interface LoadingCircleProps {
strokeWidth?: number;
className?: string;
+ hideText?: boolean
}
-const LoadingCircle = ({ strokeWidth = 12, className }: LoadingCircleProps) => {
+const LoadingCircle = ({ strokeWidth = 12, className, hideText }: LoadingCircleProps) => {
return (
- Loading...
+ { hideText ? <>> : Loading... }
);
};
diff --git a/web/components/Logs.tsx b/web/components/Logs.tsx
new file mode 100644
index 0000000..6e48727
--- /dev/null
+++ b/web/components/Logs.tsx
@@ -0,0 +1,30 @@
+"use server"
+
+import { createSupabaseServerClient } from "@/utils/supabase-server";
+import RealtimeLogs from "./RealtimeLogs";
+
+export default async function Logs(props: { runId: string }) {
+ const supabase = createSupabaseServerClient()
+
+ const { data: run } = await supabase.from('runs').select(`
+ id,
+ prompt,
+ logs(
+ id,
+ run_id,
+ created_at,
+ value,
+ ended_at,
+ status,
+ step_name
+ )
+ `).eq("id", props.runId).single()
+
+ if (!run) {
+ throw new Error(`Run with ID '${props.runId}' not found`)
+ }
+
+ return (
+
+ )
+}
\ No newline at end of file
diff --git a/web/components/Prompt.tsx b/web/components/Prompt.tsx
index 69de7df..498f92e 100644
--- a/web/components/Prompt.tsx
+++ b/web/components/Prompt.tsx
@@ -1,13 +1,10 @@
"use client";
-import { ChangeEvent, useEffect, useState } from "react";
+import { ChangeEvent, useState } from "react";
import TextField from "./TextField";
import ChatInputButton from "./ChatInputButton";
import { SparkleIcon } from "./Icons";
-import Image from "next/image";
import { startWorker } from "@/app/actions";
-import { createSupabaseBrowserClient } from "@/utils/supabase-browser";
-import { Tables } from "@/supabase/dbTypes";
import { useRouter } from "next/navigation";
import LoadingCircle from "./LoadingCircle";
import PromptInput from "./PromptInput";
@@ -25,60 +22,27 @@ const PROMPT_SUGESTIONS = [
export default function Prompt() {
const [prompt, setPrompt] = useState("");
const [isWaiting, setIsWaiting] = useState(false);
- const [workerId, setWorkerId] = useState();
- const [runId, setRunId] = useState();
- const [status, setStatus] = useState();
const router = useRouter();
- const supabase = createSupabaseBrowserClient();
const sendPrompt = async (prompt: string) => {
setIsWaiting(true);
try {
const response = await startWorker(prompt);
- setWorkerId(response.workerId);
- setRunId(response.runId);
+ router.push(`/r/${response.runId}/progress`)
} finally {
setIsWaiting(false);
}
};
- useEffect(() => {
- if (runId) {
- const channel = supabase
- .channel("logs-added")
- .on(
- "postgres_changes",
- {
- event: "INSERT",
- table: "logs",
- schema: "public",
- filter: `run_id=eq.${runId}`,
- },
- (payload: { new: Tables<"logs"> }) => {
- if (payload.new.message === "STRATEGY_CREATED") {
- router.push(`strategy/${workerId}`);
- return;
- }
- setStatus(payload.new.message);
- }
- )
- .subscribe();
-
- return () => {
- supabase.removeChannel(channel);
- };
- }
- }, [workerId, supabase, runId, workerId]);
-
return (
<>
- {status ? (
+ {isWaiting ? (
) : (
diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx
new file mode 100644
index 0000000..1030ba3
--- /dev/null
+++ b/web/components/RealtimeLogs.tsx
@@ -0,0 +1,142 @@
+"use client"
+
+import { Tables } from "@/supabase/dbTypes";
+import { createSupabaseBrowserClient } from "@/utils/supabase-browser";
+import clsx from "clsx";
+import { useRouter } from "next/navigation";
+import { useState, useEffect } from "react";
+import LoadingCircle from "./LoadingCircle";
+import Button from "./Button";
+
+const UNSTARTED_TEXTS: Record
["step_name"], string> = {
+ FETCH_PROJECTS: "Search for relevant projects",
+ EVALUATE_PROJECTS: "Evaluate proof of impact",
+ ANALYZE_FUNDING: "Analyze funding needs",
+ SYNTHESIZE_RESULTS: "Synthesize results",
+}
+
+const LOADING_TEXTS: Record["step_name"], string> = {
+ FETCH_PROJECTS: "Searching for relevant projects...",
+ EVALUATE_PROJECTS: "Evaluating proof of impact...",
+ ANALYZE_FUNDING: "Analyzing funding needs...",
+ SYNTHESIZE_RESULTS: "Synthesizing results...",
+}
+
+const STEPS_ORDER: Record["step_name"], number> = {
+ FETCH_PROJECTS: 1,
+ EVALUATE_PROJECTS: 2,
+ ANALYZE_FUNDING: 3,
+ SYNTHESIZE_RESULTS: 4,
+}
+
+const getLogMessage = (log: Tables<"logs">) => {
+ switch (log.status) {
+ case "NOT_STARTED": return UNSTARTED_TEXTS[log.step_name]
+ case "IN_PROGRESS": return LOADING_TEXTS[log.step_name]
+ case "COMPLETED": return log.value ?? `Completed: ${UNSTARTED_TEXTS[log.step_name]}`
+ case "ERRORED": return `Error while ${LOADING_TEXTS[log.step_name].toLowerCase()}`
+ }
+}
+
+const checkIfFinished = (logs: Tables<"logs">[]) => {
+ const sortedLogs = logs.sort((a, b) => {
+ return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name]
+ })
+ const lastStep = sortedLogs.slice(-1)[0];
+ const isFinished = lastStep.status === "COMPLETED" && lastStep.step_name === "SYNTHESIZE_RESULTS"
+
+ return isFinished
+}
+
+export default function RealtimeLogs(props: {
+ logs: Tables<"logs">[]
+ run: {
+ id: string;
+ prompt: string;
+ }
+}) {
+ const [logs, setLogs] = useState[]>(props.logs)
+ const supabase = createSupabaseBrowserClient();
+ const router = useRouter()
+
+ const sortedLogsWithSteps = logs.sort((a, b) => {
+ return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name]
+ })
+
+ const isFinished = checkIfFinished(sortedLogsWithSteps)
+
+ const navigateToStrategy = () => {
+ router.push(`./`)
+ }
+
+ useEffect(() => {
+ const channel = supabase
+ .channel("logs-added")
+ .on(
+ "postgres_changes",
+ {
+ event: "UPDATE",
+ table: "logs",
+ schema: "public",
+ filter: `run_id=eq.${props.run.id}`,
+ },
+ async () => {
+ const response = await supabase.from("logs").select(`
+ id,
+ run_id,
+ created_at,
+ value,
+ ended_at,
+ status,
+ step_name
+ `).eq("run_id", props.run.id)
+ const updatedLogs = response.data
+
+ if (!updatedLogs) {
+ throw new Error(`Logs for Run with ID '${props.run.id}' not found`)
+ }
+
+ setLogs([...updatedLogs])
+
+ if (checkIfFinished(updatedLogs)) {
+ navigateToStrategy()
+ return;
+ }
+ }
+ )
+ .subscribe()
+
+ return () => {
+ supabase.removeChannel(channel);
+ };
+ }, [supabase, props.run.id]);
+
+ return (
+ <>
+
+
Results:
+
+ { sortedLogsWithSteps.map(log => (
+
+ { log.status === "IN_PROGRESS" ?
: <>>}
+
{ getLogMessage(log) }
+
+
+ )) }
+
+
+
+ >
+ )
+}
\ No newline at end of file
diff --git a/web/supabase/dbTypes.ts b/web/supabase/dbTypes.ts
index a55af50..4003c34 100644
--- a/web/supabase/dbTypes.ts
+++ b/web/supabase/dbTypes.ts
@@ -9,24 +9,163 @@ export type Json =
export interface Database {
public: {
Tables: {
+ applications: {
+ Row: {
+ answers: Json | null
+ id: string
+ project_id: string
+ recipient: string
+ round: string
+ }
+ Insert: {
+ answers?: Json | null
+ id: string
+ project_id: string
+ recipient: string
+ round: string
+ }
+ Update: {
+ answers?: Json | null
+ id?: string
+ project_id?: string
+ recipient?: string
+ round?: string
+ }
+ Relationships: [
+ {
+ foreignKeyName: "applications_project_id_fkey"
+ columns: ["project_id"]
+ isOneToOne: false
+ referencedRelation: "projects"
+ referencedColumns: ["id"]
+ }
+ ]
+ }
+ gitcoin_applications: {
+ Row: {
+ created_at: string
+ data: Json
+ id: string
+ pointer: string
+ project_id: string
+ protocol: number
+ round_id: string
+ }
+ Insert: {
+ created_at?: string
+ data: Json
+ id: string
+ pointer: string
+ project_id: string
+ protocol: number
+ round_id: string
+ }
+ Update: {
+ created_at?: string
+ data?: Json
+ id?: string
+ pointer?: string
+ project_id?: string
+ protocol?: number
+ round_id?: string
+ }
+ Relationships: [
+ {
+ foreignKeyName: "gitcoin_applications_project_id_fkey"
+ columns: ["project_id"]
+ isOneToOne: false
+ referencedRelation: "gitcoin_projects"
+ referencedColumns: ["id"]
+ }
+ ]
+ }
+ gitcoin_indexing_jobs: {
+ Row: {
+ created_at: string
+ error: string | null
+ id: string
+ is_failed: boolean
+ is_running: boolean
+ last_updated_at: string
+ skip_projects: number
+ skip_rounds: number
+ url: string
+ }
+ Insert: {
+ created_at?: string
+ error?: string | null
+ id?: string
+ is_failed?: boolean
+ is_running?: boolean
+ last_updated_at?: string
+ skip_projects?: number
+ skip_rounds?: number
+ url: string
+ }
+ Update: {
+ created_at?: string
+ error?: string | null
+ id?: string
+ is_failed?: boolean
+ is_running?: boolean
+ last_updated_at?: string
+ skip_projects?: number
+ skip_rounds?: number
+ url?: string
+ }
+ Relationships: []
+ }
+ gitcoin_projects: {
+ Row: {
+ created_at: string
+ data: Json
+ id: string
+ pointer: string
+ protocol: number
+ }
+ Insert: {
+ created_at?: string
+ data: Json
+ id: string
+ pointer: string
+ protocol: number
+ }
+ Update: {
+ created_at?: string
+ data?: Json
+ id?: string
+ pointer?: string
+ protocol?: number
+ }
+ Relationships: []
+ }
logs: {
Row: {
created_at: string
+ ended_at: string | null
id: string
- message: string
run_id: string
+ status: Database["public"]["Enums"]["step_status"]
+ step_name: Database["public"]["Enums"]["step_name"]
+ value: string | null
}
Insert: {
created_at?: string
+ ended_at?: string | null
id?: string
- message: string
run_id: string
+ status: Database["public"]["Enums"]["step_status"]
+ step_name: Database["public"]["Enums"]["step_name"]
+ value?: string | null
}
Update: {
created_at?: string
+ ended_at?: string | null
id?: string
- message?: string
run_id?: string
+ status?: Database["public"]["Enums"]["step_status"]
+ step_name?: Database["public"]["Enums"]["step_name"]
+ value?: string | null
}
Relationships: [
{
@@ -42,21 +181,18 @@ export interface Database {
Row: {
description: string | null
id: string
- recipient: string | null
title: string | null
website: string | null
}
Insert: {
description?: string | null
- id?: string
- recipient?: string | null
+ id: string
title?: string | null
website?: string | null
}
Update: {
description?: string | null
id?: string
- recipient?: string | null
title?: string | null
website?: string | null
}
@@ -123,6 +259,13 @@ export interface Database {
weight?: number | null
}
Relationships: [
+ {
+ foreignKeyName: "strategy_entries_project_id_fkey"
+ columns: ["project_id"]
+ isOneToOne: false
+ referencedRelation: "projects"
+ referencedColumns: ["id"]
+ },
{
foreignKeyName: "strategy_entries_run_id_fkey"
columns: ["run_id"]
@@ -155,7 +298,12 @@ export interface Database {
[_ in never]: never
}
Enums: {
- [_ in never]: never
+ step_name:
+ | "FETCH_PROJECTS"
+ | "EVALUATE_PROJECTS"
+ | "ANALYZE_FUNDING"
+ | "SYNTHESIZE_RESULTS"
+ step_status: "NOT_STARTED" | "IN_PROGRESS" | "COMPLETED" | "ERRORED"
}
CompositeTypes: {
[_ in never]: never
diff --git a/web/supabase/migrations/20240126113522_logs.sql b/web/supabase/migrations/20240126113522_logs.sql
new file mode 100644
index 0000000..a7cacc6
--- /dev/null
+++ b/web/supabase/migrations/20240126113522_logs.sql
@@ -0,0 +1,14 @@
+
+create type "public"."step_name" as enum ('FETCH_PROJECTS', 'EVALUATE_PROJECTS', 'ANALYZE_FUNDING', 'SYNTHESIZE_RESULTS');
+
+create type "public"."step_status" as enum ('NOT_STARTED', 'IN_PROGRESS', 'COMPLETED', 'ERRORED');
+
+alter table "public"."logs" drop column "message";
+
+alter table "public"."logs" add column "ended_at" timestamp with time zone;
+
+alter table "public"."logs" add column "status" step_status not null;
+
+alter table "public"."logs" add column "step_name" step_name not null;
+
+alter table "public"."logs" add column "value" text;
\ No newline at end of file
diff --git a/workers/fund_public_goods/api/runs.py b/workers/fund_public_goods/api/runs.py
index 22800a4..15dd44e 100644
--- a/workers/fund_public_goods/api/runs.py
+++ b/workers/fund_public_goods/api/runs.py
@@ -31,7 +31,7 @@ async def runs(worker_id: str, params: Params) -> Response:
run_id = tables.runs.insert(supabase, worker_id, prompt)
await inngest_client.send(
- CreateStrategyEvent.Data(prompt=prompt, run_id=run_id).to_event()
+ CreateStrategyEvent.Data(run_id=run_id).to_event()
)
return Response(run_id=run_id)
diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py
index bac80f8..f214527 100644
--- a/workers/fund_public_goods/db/tables/logs.py
+++ b/workers/fund_public_goods/db/tables/logs.py
@@ -1,11 +1,30 @@
+from typing import Literal
from supabase import Client
+import datetime
-def insert(
+def create(
db: Client,
run_id: str,
- message: str
+ step_name: str,
):
- db.table("logs").insert({
+ return db.table("logs").insert({
"run_id": run_id,
- "message": message
+ "step_name": step_name,
+ "status": "NOT_STARTED"
}).execute()
+
+def update(
+ db: Client,
+ log_id: str,
+ status: Literal["IN_PROGRESS", "COMPLETED", "ERRORED"],
+ value: str | None
+):
+ ended_at = None
+ if status == "COMPLETED" or status == "ERRORED":
+ ended_at = datetime.datetime.now().isoformat()
+
+ return db.table("logs").update({
+ "status": status,
+ "value": value,
+ "ended_at": ended_at
+ }).eq("id", log_id).execute()
\ No newline at end of file
diff --git a/workers/fund_public_goods/db/tables/projects.py b/workers/fund_public_goods/db/tables/projects.py
index dbd9b04..3b32c98 100644
--- a/workers/fund_public_goods/db/tables/projects.py
+++ b/workers/fund_public_goods/db/tables/projects.py
@@ -1,4 +1,6 @@
from typing import Any, Dict
+from fund_public_goods.lib.strategy.models.answer import Answer
+from fund_public_goods.lib.strategy.models.project import Project
from supabase import Client, PostgrestAPIResponse
import uuid
@@ -27,3 +29,30 @@ def get_projects(db: Client) -> PostgrestAPIResponse[Dict[str, Any]]:
)
.execute()
)
+
+def fetch_projects_data(supabase: Client) -> list[Project]:
+ response = get_projects(supabase)
+
+ projects: list[Project] = []
+
+ for item in response.data:
+ answers: list[Answer] = []
+
+ for application in item.get("applications", []):
+ for answer in application.get("answers", []):
+ answers.append(Answer(
+ question=answer.get("question", ""),
+ answer=answer.get("answer", None)
+ ))
+
+ project = Project(
+ id=item.get("id", ""),
+ title=item.get("title", ""),
+ description=item.get("description", ""),
+ website=item.get("website", ""),
+ answers=answers,
+ )
+
+ projects.append(project)
+
+ return projects
\ No newline at end of file
diff --git a/workers/fund_public_goods/db/tables/runs.py b/workers/fund_public_goods/db/tables/runs.py
index caa9f7a..f71d888 100644
--- a/workers/fund_public_goods/db/tables/runs.py
+++ b/workers/fund_public_goods/db/tables/runs.py
@@ -19,5 +19,5 @@ def get_prompt(db: Client, run_id: str) -> str:
.limit(1)
.single()
.execute()
- .data
+ .data["prompt"]
)
diff --git a/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py b/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py
index 19a3623..c5959f2 100644
--- a/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py
+++ b/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py
@@ -1,76 +1,13 @@
-from chromadb import EphemeralClient
+from fund_public_goods.lib.strategy.utils.utils import stringify_projects
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.output_parsers.json import JsonOutputParser
-from langchain_openai import OpenAIEmbeddings
-from langchain.vectorstores.chroma import Chroma
-from fund_public_goods.lib.strategy.utils.generate_queries import generate_queries
from fund_public_goods.lib.strategy.models.evaluated_project import EvaluatedProject
from fund_public_goods.lib.strategy.models.project import Project
from fund_public_goods.lib.strategy.models.project_evaluation import ProjectEvaluation
-def stringify_projects(projects: list[Project], separator: str) -> str:
- project_strings = []
-
- for project in projects:
- project_str = get_project_text(project=project)
- project_strings.append(project_str)
-
- return separator.join(project_strings)
-
-def get_project_text(project: Project) -> str:
- result = f"ID: {project.id} - Description: {project.description}\n"
-
- for answer in project.answers:
- result += f" Question: {answer.question}\n"
- result += f" Answer: {answer.answer}\n"
-
- return result
-
-def remove_duplicate_projects(projects: list[Project]) -> list[Project]:
- seen = {}
- unique_projects = []
-
- for project in projects:
- if project.id not in seen:
- unique_projects.append(project)
- seen[project.id] = True
-
- return unique_projects
-
-def get_top_matching_projects(prompt: str, projects: list[Project]) -> list[Project]:
- projects_by_id = {project.id: project for project in projects}
- queries = generate_queries(prompt=prompt, n=3)
- texts: list[str] = []
- metadatas: list[dict] = []
-
- for project in projects:
- project_text = get_project_text(project=project)
- texts.append(project_text)
- metadatas.append({ "id": project["id"] })
-
- db_client = EphemeralClient()
- collection = Chroma.from_texts(
- texts=texts,
- metadatas=metadatas,
- embedding=OpenAIEmbeddings(),
- client=db_client,
- collection_name="projects"
- )
-
- top_matches: list[Project] = []
-
- for query in queries:
- matches = collection.similarity_search(query, k=5)
-
- for match in matches:
- matched_project = projects_by_id[match.metadata["id"]]
- top_matches.append(matched_project)
-
- return remove_duplicate_projects(top_matches)
-
extract_evaluations_prompts_template = """
You will be given a list of project evaluations that measure how well each project
matches the user's interest, and its impact in regards to that interest.
@@ -139,7 +76,6 @@ def extract_project_evaluations(evaluation_report: str) -> list[ProjectEvaluatio
def evaluate_projects(prompt: str, projects: list[Project]) -> list[EvaluatedProject]:
projects_by_id = {project.id: project for project in projects}
- top_matching_projects = get_top_matching_projects(prompt=prompt, projects=projects)
evaluation_prompt = ChatPromptTemplate.from_messages([
("system", evaluation_prompt_template),
@@ -154,7 +90,7 @@ def evaluate_projects(prompt: str, projects: list[Project]) -> list[EvaluatedPro
evaluation_report = evaluation_chain.invoke({
"prompt": prompt,
"separator": separator,
- "projects": stringify_projects(projects=top_matching_projects, separator=separator)
+ "projects": stringify_projects(projects=projects, separator=separator)
})
evaluations = extract_project_evaluations(evaluation_report=evaluation_report)
diff --git a/workers/fund_public_goods/lib/strategy/utils/generate_queries.py b/workers/fund_public_goods/lib/strategy/utils/generate_queries.py
index bbd62ce..1d3c0c7 100644
--- a/workers/fund_public_goods/lib/strategy/utils/generate_queries.py
+++ b/workers/fund_public_goods/lib/strategy/utils/generate_queries.py
@@ -2,6 +2,7 @@
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import CommaSeparatedListOutputParser
+
queries_prompt_template = """
Your goal is to provide a list of queries that will be used to perform
and embeddings search over different project descriptions and get the ones
diff --git a/workers/fund_public_goods/lib/strategy/utils/get_top_matching_projects.py b/workers/fund_public_goods/lib/strategy/utils/get_top_matching_projects.py
new file mode 100644
index 0000000..253dbda
--- /dev/null
+++ b/workers/fund_public_goods/lib/strategy/utils/get_top_matching_projects.py
@@ -0,0 +1,38 @@
+from chromadb import EphemeralClient
+from fund_public_goods.lib.strategy.models.project import Project
+from fund_public_goods.lib.strategy.utils.generate_queries import generate_queries
+from fund_public_goods.lib.strategy.utils.utils import get_project_text, remove_duplicate_projects
+from langchain_openai import OpenAIEmbeddings
+from langchain.vectorstores.chroma import Chroma
+
+
+def get_top_matching_projects(prompt: str, projects: list[Project]) -> list[Project]:
+ projects_by_id = {project.id: project for project in projects}
+ queries = generate_queries(prompt=prompt, n=3)
+ texts: list[str] = []
+ metadatas: list[dict] = []
+
+ for project in projects:
+ project_text = get_project_text(project=project)
+ texts.append(project_text)
+ metadatas.append({ "id": project["id"] })
+
+ db_client = EphemeralClient()
+ collection = Chroma.from_texts(
+ texts=texts,
+ metadatas=metadatas,
+ embedding=OpenAIEmbeddings(),
+ client=db_client,
+ collection_name="projects"
+ )
+
+ top_matches: list[Project] = []
+
+ for query in queries:
+ matches = collection.similarity_search(query, k=5)
+
+ for match in matches:
+ matched_project = projects_by_id[match.metadata["id"]]
+ top_matches.append(matched_project)
+
+ return remove_duplicate_projects(top_matches)
\ No newline at end of file
diff --git a/workers/fund_public_goods/lib/strategy/utils/utils.py b/workers/fund_public_goods/lib/strategy/utils/utils.py
new file mode 100644
index 0000000..4cd7a31
--- /dev/null
+++ b/workers/fund_public_goods/lib/strategy/utils/utils.py
@@ -0,0 +1,33 @@
+from fund_public_goods.lib.strategy.models.project import Project
+
+
+def stringify_projects(projects: list[Project], separator: str) -> str:
+ project_strings = []
+
+ for project in projects:
+ project_str = get_project_text(project=project)
+ project_strings.append(project_str)
+
+ return separator.join(project_strings)
+
+
+def get_project_text(project: Project) -> str:
+ result = f"ID: {project.id} - Description: {project.description}\n"
+
+ for answer in project.answers:
+ result += f" Question: {answer.question}\n"
+ result += f" Answer: {answer.answer}\n"
+
+ return result
+
+
+def remove_duplicate_projects(projects: list[Project]) -> list[Project]:
+ seen = {}
+ unique_projects = []
+
+ for project in projects:
+ if project.id not in seen:
+ unique_projects.append(project)
+ seen[project.id] = True
+
+ return unique_projects
\ No newline at end of file
diff --git a/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py b/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py
index ff4e688..936e889 100644
--- a/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py
+++ b/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py
@@ -1,3 +1,6 @@
+import json
+import typing
+from fund_public_goods.lib.strategy.utils.get_top_matching_projects import get_top_matching_projects
import inngest
from supabase import Client
from fund_public_goods.lib.strategy.utils.assign_weights import assign_weights
@@ -9,40 +12,37 @@
)
from fund_public_goods.lib.strategy.models.project import Project
from fund_public_goods.lib.strategy.models.weighted_project import WeightedProject
-from fund_public_goods.db.tables.projects import get_projects
+from fund_public_goods.db.tables.projects import fetch_projects_data
from fund_public_goods.db.tables.runs import get_prompt
from fund_public_goods.db.tables.strategy_entries import insert_multiple
from fund_public_goods.db import client, logs
from fund_public_goods.workflows.create_strategy.events import CreateStrategyEvent
-def fetch_projects_data(supabase: Client) -> list[Project]:
- response = get_projects(supabase)
- projects = []
+StepNames = typing.Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"]
+step_names: list[StepNames] = ["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"]
- for item in response.data:
- answers = []
- for application in item.get("applications", []):
- for answer in application.get("answers", []):
- answers.append(
- {
- "question": answer.get("question", ""),
- "answer": answer.get("answer", None),
- }
- )
+def fetch_matching_projects(supabase: Client, prompt: str):
+ projects = fetch_projects_data(supabase)
+ matching_projects = get_top_matching_projects(prompt, projects)
+
+ return [project.model_dump() for project in matching_projects]
- project = Project(
- id=item.get("id", ""),
- title=item.get("title", ""),
- description=item.get("description", ""),
- website=item.get("website", ""),
- answers=answers,
- )
- projects.append(project)
- return projects
+def initialize_logs(supabase: Client, run_id: str) -> str:
+ log_ids: dict[StepNames, str] = {}
+
+ for step_name in step_names:
+ new_log = logs.create(
+ db=supabase,
+ run_id=run_id,
+ step_name=step_name,
+ ).data
+
+ log_ids[step_name] = new_log[0]["id"]
+ return json.dumps(log_ids)
@inngest.create_function(
fn_id="create_strategy",
@@ -55,31 +55,52 @@ async def create_strategy(
data = CreateStrategyEvent.Data.model_validate(ctx.event.data)
run_id = data.run_id
supabase = client.create_admin()
-
- await step.run(
- "extracting_prompt",
- lambda: logs.insert(supabase, run_id, "Extracting prompt from run_id"),
+
+ prompt = await step.run(
+ "extract_prompt",
+ lambda: get_prompt(supabase, run_id),
)
-
- prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id))
-
+
+ log_ids_str = await step.run(
+ "initialize_logs",
+ lambda: initialize_logs(supabase, run_id),
+ )
+
+ log_ids: dict[StepNames, str] = json.loads(log_ids_str)
+
await step.run(
- "fetching_projects_info",
- lambda: logs.insert(supabase, run_id, "Getting information from data sources"),
+ "start_fetch_projects_data",
+ lambda: logs.update(
+ db=supabase,
+ status="IN_PROGRESS",
+ log_id=log_ids["FETCH_PROJECTS"],
+ value=None,
+ ),
)
json_projects = await step.run(
- "fetch_projects_data", lambda: fetch_projects_data(supabase)
+ "fetch_projects_data", lambda: fetch_matching_projects(supabase, prompt)
)
- projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore
-
+ projects = [Project(**json_project) for json_project in json_projects]
+
await step.run(
- "assessing",
- lambda: logs.insert(
- supabase,
- run_id,
- "Assessing impact of each project related to the users interest",
+ "completed_fetch_projects_data",
+ lambda: logs.update(
+ db=supabase,
+ status="COMPLETED",
+ log_id=log_ids["FETCH_PROJECTS"],
+ value=f"Found {len(projects)} projects related to '{prompt}'",
+ ),
+ )
+
+ await step.run(
+ "start_assess_projects",
+ lambda: logs.update(
+ db=supabase,
+ status="IN_PROGRESS",
+ log_id=log_ids["EVALUATE_PROJECTS"],
+ value=None,
),
)
@@ -87,13 +108,24 @@ async def create_strategy(
"assess_projects", lambda: evaluate_projects(prompt, projects)
)
assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore
-
+
await step.run(
- "determining_funding",
- lambda: logs.insert(
+ "completed_assess_projects",
+ lambda: logs.update(
+ db=supabase,
+ status="COMPLETED",
+ log_id=log_ids["EVALUATE_PROJECTS"],
+ value=f"Evaluated {len(assessed_projects)} projects",
+ ),
+ )
+
+ await step.run(
+ "start_determine_funding",
+ lambda: logs.update(
supabase,
- run_id,
- "Determining the relative funding that the best matching projects need",
+ status="IN_PROGRESS",
+ log_id=log_ids["ANALYZE_FUNDING"],
+ value=None,
),
)
@@ -101,16 +133,39 @@ async def create_strategy(
"determine_funding", lambda: assign_weights(assessed_projects)
)
weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore
-
+
await step.run(
- "saving_results_to_db",
- lambda: logs.insert(supabase, run_id, "Generating results"),
+ "completed_determine_funding",
+ lambda: logs.update(
+ supabase,
+ status="COMPLETED",
+ log_id=log_ids["ANALYZE_FUNDING"],
+ value="Determined the relative funding that the best matching projects need",
+ ),
+ )
+
+ await step.run(
+ "start_synthesize_results",
+ lambda: logs.update(
+ supabase,
+ status="IN_PROGRESS",
+ log_id=log_ids["SYNTHESIZE_RESULTS"],
+ value=None
+ ),
)
await step.run(
"save_strategy_to_db", lambda: insert_multiple(supabase, run_id, weighted_projects)
)
-
- await step.run("result", lambda: logs.insert(supabase, run_id, "STRATEGY_CREATED"))
+
+ await step.run(
+ "completed_synthesize_results",
+ lambda: logs.update(
+ supabase,
+ status="COMPLETED",
+ log_id=log_ids["SYNTHESIZE_RESULTS"],
+ value="Results generated"
+ ),
+ )
return "done"