diff --git a/web/app/strategy/[id]/page.tsx b/web/app/r/[id]/page.tsx similarity index 88% rename from web/app/strategy/[id]/page.tsx rename to web/app/r/[id]/page.tsx index 1184c5b..d25a590 100644 --- a/web/app/strategy/[id]/page.tsx +++ b/web/app/r/[id]/page.tsx @@ -7,7 +7,6 @@ export default async function StrategyPage({ }: { params: { id: string }; }) { - const workerId = params.id; const supabase = createSupabaseServerClient(); // // Fetch the runs for this worker @@ -25,13 +24,13 @@ export default async function StrategyPage({ ) ` ) - .eq("worker_id", workerId) + .eq("id", params.id) .order("created_at", { ascending: false }) .single(); if (runs.error || !runs.data) { console.error(runs.error); - throw Error(`Runs with worker_id ${workerId} not found.`); + throw Error(`Runs with id ${params.id} not found.`); } const data = runs.data.strategy_entries as unknown as StrategyWithProjects; diff --git a/web/app/r/[id]/progress/page.tsx b/web/app/r/[id]/progress/page.tsx new file mode 100644 index 0000000..c4e5cbc --- /dev/null +++ b/web/app/r/[id]/progress/page.tsx @@ -0,0 +1,40 @@ +import Logs from "@/components/Logs"; +import TextField from "@/components/TextField"; +import { createSupabaseServerClient } from "@/utils/supabase-server"; + +async function PromptField(props: { runId: string }) { + const supabase = createSupabaseServerClient() + + const { data: run } = await supabase.from('runs').select(` + id, + prompt + `).eq("id", props.runId).single() + + if (!run) { + throw new Error(`Run with ID '${props.runId}' not found`) + } + + return +} + +export default function ProgressPage(props: { + params: { + id: string + } +}) { + return ( +
+
+
+ +
+
+ +
+
+ ) +} \ No newline at end of file diff --git a/web/components/LoadingCircle.tsx b/web/components/LoadingCircle.tsx index 86ce915..c36a717 100644 --- a/web/components/LoadingCircle.tsx +++ b/web/components/LoadingCircle.tsx @@ -3,9 +3,10 @@ import clsx from "clsx"; interface LoadingCircleProps { strokeWidth?: number; className?: string; + hideText?: boolean } -const LoadingCircle = ({ strokeWidth = 12, className }: LoadingCircleProps) => { +const LoadingCircle = ({ strokeWidth = 12, className, hideText }: LoadingCircleProps) => { return (
{ strokeLinecap="round" /> - Loading... + { hideText ? <> : Loading... }
); }; diff --git a/web/components/Logs.tsx b/web/components/Logs.tsx new file mode 100644 index 0000000..6e48727 --- /dev/null +++ b/web/components/Logs.tsx @@ -0,0 +1,30 @@ +"use server" + +import { createSupabaseServerClient } from "@/utils/supabase-server"; +import RealtimeLogs from "./RealtimeLogs"; + +export default async function Logs(props: { runId: string }) { + const supabase = createSupabaseServerClient() + + const { data: run } = await supabase.from('runs').select(` + id, + prompt, + logs( + id, + run_id, + created_at, + value, + ended_at, + status, + step_name + ) + `).eq("id", props.runId).single() + + if (!run) { + throw new Error(`Run with ID '${props.runId}' not found`) + } + + return ( + + ) +} \ No newline at end of file diff --git a/web/components/Prompt.tsx b/web/components/Prompt.tsx index 69de7df..498f92e 100644 --- a/web/components/Prompt.tsx +++ b/web/components/Prompt.tsx @@ -1,13 +1,10 @@ "use client"; -import { ChangeEvent, useEffect, useState } from "react"; +import { ChangeEvent, useState } from "react"; import TextField from "./TextField"; import ChatInputButton from "./ChatInputButton"; import { SparkleIcon } from "./Icons"; -import Image from "next/image"; import { startWorker } from "@/app/actions"; -import { createSupabaseBrowserClient } from "@/utils/supabase-browser"; -import { Tables } from "@/supabase/dbTypes"; import { useRouter } from "next/navigation"; import LoadingCircle from "./LoadingCircle"; import PromptInput from "./PromptInput"; @@ -25,60 +22,27 @@ const PROMPT_SUGESTIONS = [ export default function Prompt() { const [prompt, setPrompt] = useState(""); const [isWaiting, setIsWaiting] = useState(false); - const [workerId, setWorkerId] = useState(); - const [runId, setRunId] = useState(); - const [status, setStatus] = useState(); const router = useRouter(); - const supabase = createSupabaseBrowserClient(); const sendPrompt = async (prompt: string) => { setIsWaiting(true); try { const response = await startWorker(prompt); - setWorkerId(response.workerId); - setRunId(response.runId); + router.push(`/r/${response.runId}/progress`) } finally { setIsWaiting(false); } }; - useEffect(() => { - if (runId) { - const channel = supabase - .channel("logs-added") - .on( - "postgres_changes", - { - event: "INSERT", - table: "logs", - schema: "public", - filter: `run_id=eq.${runId}`, - }, - (payload: { new: Tables<"logs"> }) => { - if (payload.new.message === "STRATEGY_CREATED") { - router.push(`strategy/${workerId}`); - return; - } - setStatus(payload.new.message); - } - ) - .subscribe(); - - return () => { - supabase.removeChannel(channel); - }; - } - }, [workerId, supabase, runId, workerId]); - return ( <>
- {status ? ( + {isWaiting ? (
-
{status}
+
Loading
) : ( diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx new file mode 100644 index 0000000..1030ba3 --- /dev/null +++ b/web/components/RealtimeLogs.tsx @@ -0,0 +1,142 @@ +"use client" + +import { Tables } from "@/supabase/dbTypes"; +import { createSupabaseBrowserClient } from "@/utils/supabase-browser"; +import clsx from "clsx"; +import { useRouter } from "next/navigation"; +import { useState, useEffect } from "react"; +import LoadingCircle from "./LoadingCircle"; +import Button from "./Button"; + +const UNSTARTED_TEXTS: Record["step_name"], string> = { + FETCH_PROJECTS: "Search for relevant projects", + EVALUATE_PROJECTS: "Evaluate proof of impact", + ANALYZE_FUNDING: "Analyze funding needs", + SYNTHESIZE_RESULTS: "Synthesize results", +} + +const LOADING_TEXTS: Record["step_name"], string> = { + FETCH_PROJECTS: "Searching for relevant projects...", + EVALUATE_PROJECTS: "Evaluating proof of impact...", + ANALYZE_FUNDING: "Analyzing funding needs...", + SYNTHESIZE_RESULTS: "Synthesizing results...", +} + +const STEPS_ORDER: Record["step_name"], number> = { + FETCH_PROJECTS: 1, + EVALUATE_PROJECTS: 2, + ANALYZE_FUNDING: 3, + SYNTHESIZE_RESULTS: 4, +} + +const getLogMessage = (log: Tables<"logs">) => { + switch (log.status) { + case "NOT_STARTED": return UNSTARTED_TEXTS[log.step_name] + case "IN_PROGRESS": return LOADING_TEXTS[log.step_name] + case "COMPLETED": return log.value ?? `Completed: ${UNSTARTED_TEXTS[log.step_name]}` + case "ERRORED": return `Error while ${LOADING_TEXTS[log.step_name].toLowerCase()}` + } +} + +const checkIfFinished = (logs: Tables<"logs">[]) => { + const sortedLogs = logs.sort((a, b) => { + return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name] + }) + const lastStep = sortedLogs.slice(-1)[0]; + const isFinished = lastStep.status === "COMPLETED" && lastStep.step_name === "SYNTHESIZE_RESULTS" + + return isFinished +} + +export default function RealtimeLogs(props: { + logs: Tables<"logs">[] + run: { + id: string; + prompt: string; + } +}) { + const [logs, setLogs] = useState[]>(props.logs) + const supabase = createSupabaseBrowserClient(); + const router = useRouter() + + const sortedLogsWithSteps = logs.sort((a, b) => { + return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name] + }) + + const isFinished = checkIfFinished(sortedLogsWithSteps) + + const navigateToStrategy = () => { + router.push(`./`) + } + + useEffect(() => { + const channel = supabase + .channel("logs-added") + .on( + "postgres_changes", + { + event: "UPDATE", + table: "logs", + schema: "public", + filter: `run_id=eq.${props.run.id}`, + }, + async () => { + const response = await supabase.from("logs").select(` + id, + run_id, + created_at, + value, + ended_at, + status, + step_name + `).eq("run_id", props.run.id) + const updatedLogs = response.data + + if (!updatedLogs) { + throw new Error(`Logs for Run with ID '${props.run.id}' not found`) + } + + setLogs([...updatedLogs]) + + if (checkIfFinished(updatedLogs)) { + navigateToStrategy() + return; + } + } + ) + .subscribe() + + return () => { + supabase.removeChannel(channel); + }; + }, [supabase, props.run.id]); + + return ( + <> +
+

Results:

+
+ { sortedLogsWithSteps.map(log => ( +
+ { log.status === "IN_PROGRESS" ? : <>} +

{ getLogMessage(log) }

+
+
+ )) } +
+
+ + + ) +} \ No newline at end of file diff --git a/web/supabase/dbTypes.ts b/web/supabase/dbTypes.ts index a55af50..4003c34 100644 --- a/web/supabase/dbTypes.ts +++ b/web/supabase/dbTypes.ts @@ -9,24 +9,163 @@ export type Json = export interface Database { public: { Tables: { + applications: { + Row: { + answers: Json | null + id: string + project_id: string + recipient: string + round: string + } + Insert: { + answers?: Json | null + id: string + project_id: string + recipient: string + round: string + } + Update: { + answers?: Json | null + id?: string + project_id?: string + recipient?: string + round?: string + } + Relationships: [ + { + foreignKeyName: "applications_project_id_fkey" + columns: ["project_id"] + isOneToOne: false + referencedRelation: "projects" + referencedColumns: ["id"] + } + ] + } + gitcoin_applications: { + Row: { + created_at: string + data: Json + id: string + pointer: string + project_id: string + protocol: number + round_id: string + } + Insert: { + created_at?: string + data: Json + id: string + pointer: string + project_id: string + protocol: number + round_id: string + } + Update: { + created_at?: string + data?: Json + id?: string + pointer?: string + project_id?: string + protocol?: number + round_id?: string + } + Relationships: [ + { + foreignKeyName: "gitcoin_applications_project_id_fkey" + columns: ["project_id"] + isOneToOne: false + referencedRelation: "gitcoin_projects" + referencedColumns: ["id"] + } + ] + } + gitcoin_indexing_jobs: { + Row: { + created_at: string + error: string | null + id: string + is_failed: boolean + is_running: boolean + last_updated_at: string + skip_projects: number + skip_rounds: number + url: string + } + Insert: { + created_at?: string + error?: string | null + id?: string + is_failed?: boolean + is_running?: boolean + last_updated_at?: string + skip_projects?: number + skip_rounds?: number + url: string + } + Update: { + created_at?: string + error?: string | null + id?: string + is_failed?: boolean + is_running?: boolean + last_updated_at?: string + skip_projects?: number + skip_rounds?: number + url?: string + } + Relationships: [] + } + gitcoin_projects: { + Row: { + created_at: string + data: Json + id: string + pointer: string + protocol: number + } + Insert: { + created_at?: string + data: Json + id: string + pointer: string + protocol: number + } + Update: { + created_at?: string + data?: Json + id?: string + pointer?: string + protocol?: number + } + Relationships: [] + } logs: { Row: { created_at: string + ended_at: string | null id: string - message: string run_id: string + status: Database["public"]["Enums"]["step_status"] + step_name: Database["public"]["Enums"]["step_name"] + value: string | null } Insert: { created_at?: string + ended_at?: string | null id?: string - message: string run_id: string + status: Database["public"]["Enums"]["step_status"] + step_name: Database["public"]["Enums"]["step_name"] + value?: string | null } Update: { created_at?: string + ended_at?: string | null id?: string - message?: string run_id?: string + status?: Database["public"]["Enums"]["step_status"] + step_name?: Database["public"]["Enums"]["step_name"] + value?: string | null } Relationships: [ { @@ -42,21 +181,18 @@ export interface Database { Row: { description: string | null id: string - recipient: string | null title: string | null website: string | null } Insert: { description?: string | null - id?: string - recipient?: string | null + id: string title?: string | null website?: string | null } Update: { description?: string | null id?: string - recipient?: string | null title?: string | null website?: string | null } @@ -123,6 +259,13 @@ export interface Database { weight?: number | null } Relationships: [ + { + foreignKeyName: "strategy_entries_project_id_fkey" + columns: ["project_id"] + isOneToOne: false + referencedRelation: "projects" + referencedColumns: ["id"] + }, { foreignKeyName: "strategy_entries_run_id_fkey" columns: ["run_id"] @@ -155,7 +298,12 @@ export interface Database { [_ in never]: never } Enums: { - [_ in never]: never + step_name: + | "FETCH_PROJECTS" + | "EVALUATE_PROJECTS" + | "ANALYZE_FUNDING" + | "SYNTHESIZE_RESULTS" + step_status: "NOT_STARTED" | "IN_PROGRESS" | "COMPLETED" | "ERRORED" } CompositeTypes: { [_ in never]: never diff --git a/web/supabase/migrations/20240126113522_logs.sql b/web/supabase/migrations/20240126113522_logs.sql new file mode 100644 index 0000000..a7cacc6 --- /dev/null +++ b/web/supabase/migrations/20240126113522_logs.sql @@ -0,0 +1,14 @@ + +create type "public"."step_name" as enum ('FETCH_PROJECTS', 'EVALUATE_PROJECTS', 'ANALYZE_FUNDING', 'SYNTHESIZE_RESULTS'); + +create type "public"."step_status" as enum ('NOT_STARTED', 'IN_PROGRESS', 'COMPLETED', 'ERRORED'); + +alter table "public"."logs" drop column "message"; + +alter table "public"."logs" add column "ended_at" timestamp with time zone; + +alter table "public"."logs" add column "status" step_status not null; + +alter table "public"."logs" add column "step_name" step_name not null; + +alter table "public"."logs" add column "value" text; \ No newline at end of file diff --git a/workers/fund_public_goods/api/runs.py b/workers/fund_public_goods/api/runs.py index 22800a4..15dd44e 100644 --- a/workers/fund_public_goods/api/runs.py +++ b/workers/fund_public_goods/api/runs.py @@ -31,7 +31,7 @@ async def runs(worker_id: str, params: Params) -> Response: run_id = tables.runs.insert(supabase, worker_id, prompt) await inngest_client.send( - CreateStrategyEvent.Data(prompt=prompt, run_id=run_id).to_event() + CreateStrategyEvent.Data(run_id=run_id).to_event() ) return Response(run_id=run_id) diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py index bac80f8..f214527 100644 --- a/workers/fund_public_goods/db/tables/logs.py +++ b/workers/fund_public_goods/db/tables/logs.py @@ -1,11 +1,30 @@ +from typing import Literal from supabase import Client +import datetime -def insert( +def create( db: Client, run_id: str, - message: str + step_name: str, ): - db.table("logs").insert({ + return db.table("logs").insert({ "run_id": run_id, - "message": message + "step_name": step_name, + "status": "NOT_STARTED" }).execute() + +def update( + db: Client, + log_id: str, + status: Literal["IN_PROGRESS", "COMPLETED", "ERRORED"], + value: str | None +): + ended_at = None + if status == "COMPLETED" or status == "ERRORED": + ended_at = datetime.datetime.now().isoformat() + + return db.table("logs").update({ + "status": status, + "value": value, + "ended_at": ended_at + }).eq("id", log_id).execute() \ No newline at end of file diff --git a/workers/fund_public_goods/db/tables/projects.py b/workers/fund_public_goods/db/tables/projects.py index dbd9b04..3b32c98 100644 --- a/workers/fund_public_goods/db/tables/projects.py +++ b/workers/fund_public_goods/db/tables/projects.py @@ -1,4 +1,6 @@ from typing import Any, Dict +from fund_public_goods.lib.strategy.models.answer import Answer +from fund_public_goods.lib.strategy.models.project import Project from supabase import Client, PostgrestAPIResponse import uuid @@ -27,3 +29,30 @@ def get_projects(db: Client) -> PostgrestAPIResponse[Dict[str, Any]]: ) .execute() ) + +def fetch_projects_data(supabase: Client) -> list[Project]: + response = get_projects(supabase) + + projects: list[Project] = [] + + for item in response.data: + answers: list[Answer] = [] + + for application in item.get("applications", []): + for answer in application.get("answers", []): + answers.append(Answer( + question=answer.get("question", ""), + answer=answer.get("answer", None) + )) + + project = Project( + id=item.get("id", ""), + title=item.get("title", ""), + description=item.get("description", ""), + website=item.get("website", ""), + answers=answers, + ) + + projects.append(project) + + return projects \ No newline at end of file diff --git a/workers/fund_public_goods/db/tables/runs.py b/workers/fund_public_goods/db/tables/runs.py index caa9f7a..f71d888 100644 --- a/workers/fund_public_goods/db/tables/runs.py +++ b/workers/fund_public_goods/db/tables/runs.py @@ -19,5 +19,5 @@ def get_prompt(db: Client, run_id: str) -> str: .limit(1) .single() .execute() - .data + .data["prompt"] ) diff --git a/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py b/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py index 19a3623..c5959f2 100644 --- a/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py +++ b/workers/fund_public_goods/lib/strategy/utils/evaluate_projects.py @@ -1,76 +1,13 @@ -from chromadb import EphemeralClient +from fund_public_goods.lib.strategy.utils.utils import stringify_projects from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.output_parsers.json import JsonOutputParser -from langchain_openai import OpenAIEmbeddings -from langchain.vectorstores.chroma import Chroma -from fund_public_goods.lib.strategy.utils.generate_queries import generate_queries from fund_public_goods.lib.strategy.models.evaluated_project import EvaluatedProject from fund_public_goods.lib.strategy.models.project import Project from fund_public_goods.lib.strategy.models.project_evaluation import ProjectEvaluation -def stringify_projects(projects: list[Project], separator: str) -> str: - project_strings = [] - - for project in projects: - project_str = get_project_text(project=project) - project_strings.append(project_str) - - return separator.join(project_strings) - -def get_project_text(project: Project) -> str: - result = f"ID: {project.id} - Description: {project.description}\n" - - for answer in project.answers: - result += f" Question: {answer.question}\n" - result += f" Answer: {answer.answer}\n" - - return result - -def remove_duplicate_projects(projects: list[Project]) -> list[Project]: - seen = {} - unique_projects = [] - - for project in projects: - if project.id not in seen: - unique_projects.append(project) - seen[project.id] = True - - return unique_projects - -def get_top_matching_projects(prompt: str, projects: list[Project]) -> list[Project]: - projects_by_id = {project.id: project for project in projects} - queries = generate_queries(prompt=prompt, n=3) - texts: list[str] = [] - metadatas: list[dict] = [] - - for project in projects: - project_text = get_project_text(project=project) - texts.append(project_text) - metadatas.append({ "id": project["id"] }) - - db_client = EphemeralClient() - collection = Chroma.from_texts( - texts=texts, - metadatas=metadatas, - embedding=OpenAIEmbeddings(), - client=db_client, - collection_name="projects" - ) - - top_matches: list[Project] = [] - - for query in queries: - matches = collection.similarity_search(query, k=5) - - for match in matches: - matched_project = projects_by_id[match.metadata["id"]] - top_matches.append(matched_project) - - return remove_duplicate_projects(top_matches) - extract_evaluations_prompts_template = """ You will be given a list of project evaluations that measure how well each project matches the user's interest, and its impact in regards to that interest. @@ -139,7 +76,6 @@ def extract_project_evaluations(evaluation_report: str) -> list[ProjectEvaluatio def evaluate_projects(prompt: str, projects: list[Project]) -> list[EvaluatedProject]: projects_by_id = {project.id: project for project in projects} - top_matching_projects = get_top_matching_projects(prompt=prompt, projects=projects) evaluation_prompt = ChatPromptTemplate.from_messages([ ("system", evaluation_prompt_template), @@ -154,7 +90,7 @@ def evaluate_projects(prompt: str, projects: list[Project]) -> list[EvaluatedPro evaluation_report = evaluation_chain.invoke({ "prompt": prompt, "separator": separator, - "projects": stringify_projects(projects=top_matching_projects, separator=separator) + "projects": stringify_projects(projects=projects, separator=separator) }) evaluations = extract_project_evaluations(evaluation_report=evaluation_report) diff --git a/workers/fund_public_goods/lib/strategy/utils/generate_queries.py b/workers/fund_public_goods/lib/strategy/utils/generate_queries.py index bbd62ce..1d3c0c7 100644 --- a/workers/fund_public_goods/lib/strategy/utils/generate_queries.py +++ b/workers/fund_public_goods/lib/strategy/utils/generate_queries.py @@ -2,6 +2,7 @@ from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import CommaSeparatedListOutputParser + queries_prompt_template = """ Your goal is to provide a list of queries that will be used to perform and embeddings search over different project descriptions and get the ones diff --git a/workers/fund_public_goods/lib/strategy/utils/get_top_matching_projects.py b/workers/fund_public_goods/lib/strategy/utils/get_top_matching_projects.py new file mode 100644 index 0000000..253dbda --- /dev/null +++ b/workers/fund_public_goods/lib/strategy/utils/get_top_matching_projects.py @@ -0,0 +1,38 @@ +from chromadb import EphemeralClient +from fund_public_goods.lib.strategy.models.project import Project +from fund_public_goods.lib.strategy.utils.generate_queries import generate_queries +from fund_public_goods.lib.strategy.utils.utils import get_project_text, remove_duplicate_projects +from langchain_openai import OpenAIEmbeddings +from langchain.vectorstores.chroma import Chroma + + +def get_top_matching_projects(prompt: str, projects: list[Project]) -> list[Project]: + projects_by_id = {project.id: project for project in projects} + queries = generate_queries(prompt=prompt, n=3) + texts: list[str] = [] + metadatas: list[dict] = [] + + for project in projects: + project_text = get_project_text(project=project) + texts.append(project_text) + metadatas.append({ "id": project["id"] }) + + db_client = EphemeralClient() + collection = Chroma.from_texts( + texts=texts, + metadatas=metadatas, + embedding=OpenAIEmbeddings(), + client=db_client, + collection_name="projects" + ) + + top_matches: list[Project] = [] + + for query in queries: + matches = collection.similarity_search(query, k=5) + + for match in matches: + matched_project = projects_by_id[match.metadata["id"]] + top_matches.append(matched_project) + + return remove_duplicate_projects(top_matches) \ No newline at end of file diff --git a/workers/fund_public_goods/lib/strategy/utils/utils.py b/workers/fund_public_goods/lib/strategy/utils/utils.py new file mode 100644 index 0000000..4cd7a31 --- /dev/null +++ b/workers/fund_public_goods/lib/strategy/utils/utils.py @@ -0,0 +1,33 @@ +from fund_public_goods.lib.strategy.models.project import Project + + +def stringify_projects(projects: list[Project], separator: str) -> str: + project_strings = [] + + for project in projects: + project_str = get_project_text(project=project) + project_strings.append(project_str) + + return separator.join(project_strings) + + +def get_project_text(project: Project) -> str: + result = f"ID: {project.id} - Description: {project.description}\n" + + for answer in project.answers: + result += f" Question: {answer.question}\n" + result += f" Answer: {answer.answer}\n" + + return result + + +def remove_duplicate_projects(projects: list[Project]) -> list[Project]: + seen = {} + unique_projects = [] + + for project in projects: + if project.id not in seen: + unique_projects.append(project) + seen[project.id] = True + + return unique_projects \ No newline at end of file diff --git a/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py b/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py index ff4e688..936e889 100644 --- a/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py +++ b/workers/fund_public_goods/workflows/create_strategy/functions/create_strategy.py @@ -1,3 +1,6 @@ +import json +import typing +from fund_public_goods.lib.strategy.utils.get_top_matching_projects import get_top_matching_projects import inngest from supabase import Client from fund_public_goods.lib.strategy.utils.assign_weights import assign_weights @@ -9,40 +12,37 @@ ) from fund_public_goods.lib.strategy.models.project import Project from fund_public_goods.lib.strategy.models.weighted_project import WeightedProject -from fund_public_goods.db.tables.projects import get_projects +from fund_public_goods.db.tables.projects import fetch_projects_data from fund_public_goods.db.tables.runs import get_prompt from fund_public_goods.db.tables.strategy_entries import insert_multiple from fund_public_goods.db import client, logs from fund_public_goods.workflows.create_strategy.events import CreateStrategyEvent -def fetch_projects_data(supabase: Client) -> list[Project]: - response = get_projects(supabase) - projects = [] +StepNames = typing.Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] +step_names: list[StepNames] = ["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] - for item in response.data: - answers = [] - for application in item.get("applications", []): - for answer in application.get("answers", []): - answers.append( - { - "question": answer.get("question", ""), - "answer": answer.get("answer", None), - } - ) +def fetch_matching_projects(supabase: Client, prompt: str): + projects = fetch_projects_data(supabase) + matching_projects = get_top_matching_projects(prompt, projects) + + return [project.model_dump() for project in matching_projects] - project = Project( - id=item.get("id", ""), - title=item.get("title", ""), - description=item.get("description", ""), - website=item.get("website", ""), - answers=answers, - ) - projects.append(project) - return projects +def initialize_logs(supabase: Client, run_id: str) -> str: + log_ids: dict[StepNames, str] = {} + + for step_name in step_names: + new_log = logs.create( + db=supabase, + run_id=run_id, + step_name=step_name, + ).data + + log_ids[step_name] = new_log[0]["id"] + return json.dumps(log_ids) @inngest.create_function( fn_id="create_strategy", @@ -55,31 +55,52 @@ async def create_strategy( data = CreateStrategyEvent.Data.model_validate(ctx.event.data) run_id = data.run_id supabase = client.create_admin() - - await step.run( - "extracting_prompt", - lambda: logs.insert(supabase, run_id, "Extracting prompt from run_id"), + + prompt = await step.run( + "extract_prompt", + lambda: get_prompt(supabase, run_id), ) - - prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id)) - + + log_ids_str = await step.run( + "initialize_logs", + lambda: initialize_logs(supabase, run_id), + ) + + log_ids: dict[StepNames, str] = json.loads(log_ids_str) + await step.run( - "fetching_projects_info", - lambda: logs.insert(supabase, run_id, "Getting information from data sources"), + "start_fetch_projects_data", + lambda: logs.update( + db=supabase, + status="IN_PROGRESS", + log_id=log_ids["FETCH_PROJECTS"], + value=None, + ), ) json_projects = await step.run( - "fetch_projects_data", lambda: fetch_projects_data(supabase) + "fetch_projects_data", lambda: fetch_matching_projects(supabase, prompt) ) - projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore - + projects = [Project(**json_project) for json_project in json_projects] + await step.run( - "assessing", - lambda: logs.insert( - supabase, - run_id, - "Assessing impact of each project related to the users interest", + "completed_fetch_projects_data", + lambda: logs.update( + db=supabase, + status="COMPLETED", + log_id=log_ids["FETCH_PROJECTS"], + value=f"Found {len(projects)} projects related to '{prompt}'", + ), + ) + + await step.run( + "start_assess_projects", + lambda: logs.update( + db=supabase, + status="IN_PROGRESS", + log_id=log_ids["EVALUATE_PROJECTS"], + value=None, ), ) @@ -87,13 +108,24 @@ async def create_strategy( "assess_projects", lambda: evaluate_projects(prompt, projects) ) assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore - + await step.run( - "determining_funding", - lambda: logs.insert( + "completed_assess_projects", + lambda: logs.update( + db=supabase, + status="COMPLETED", + log_id=log_ids["EVALUATE_PROJECTS"], + value=f"Evaluated {len(assessed_projects)} projects", + ), + ) + + await step.run( + "start_determine_funding", + lambda: logs.update( supabase, - run_id, - "Determining the relative funding that the best matching projects need", + status="IN_PROGRESS", + log_id=log_ids["ANALYZE_FUNDING"], + value=None, ), ) @@ -101,16 +133,39 @@ async def create_strategy( "determine_funding", lambda: assign_weights(assessed_projects) ) weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore - + await step.run( - "saving_results_to_db", - lambda: logs.insert(supabase, run_id, "Generating results"), + "completed_determine_funding", + lambda: logs.update( + supabase, + status="COMPLETED", + log_id=log_ids["ANALYZE_FUNDING"], + value="Determined the relative funding that the best matching projects need", + ), + ) + + await step.run( + "start_synthesize_results", + lambda: logs.update( + supabase, + status="IN_PROGRESS", + log_id=log_ids["SYNTHESIZE_RESULTS"], + value=None + ), ) await step.run( "save_strategy_to_db", lambda: insert_multiple(supabase, run_id, weighted_projects) ) - - await step.run("result", lambda: logs.insert(supabase, run_id, "STRATEGY_CREATED")) + + await step.run( + "completed_synthesize_results", + lambda: logs.update( + supabase, + status="COMPLETED", + log_id=log_ids["SYNTHESIZE_RESULTS"], + value="Results generated" + ), + ) return "done"