From 79e0cfb7e9fb268a4d24894c01013b09d9361658 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Wed, 24 Jan 2024 16:16:19 +0100 Subject: [PATCH 01/17] Typo fix + rearranged url patterns --- web/app/{strategy/[id] => [id]/strategy}/page.tsx | 0 workers/fund_public_goods/workers/functions/create_strategy.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename web/app/{strategy/[id] => [id]/strategy}/page.tsx (100%) diff --git a/web/app/strategy/[id]/page.tsx b/web/app/[id]/strategy/page.tsx similarity index 100% rename from web/app/strategy/[id]/page.tsx rename to web/app/[id]/strategy/page.tsx diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 944cc60..397bb3b 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -79,7 +79,7 @@ async def create_strategy( lambda: logs.insert( supabase, run_id, - "Assessing impact of each project realted to the users interest", + "Assessing impact of each project related to the users interest", ), ) From c8e3530294250f90c18a38283070d54fa0c4b1ac Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Wed, 24 Jan 2024 21:53:30 +0100 Subject: [PATCH 02/17] Added step enum to logs + updated worker's logging accordingly --- web/supabase/dbTypes.ts | 157 +++++++++++++++++- workers/fund_public_goods/api/runs.py | 2 +- workers/fund_public_goods/db/tables/logs.py | 3 + .../workers/functions/create_strategy.py | 42 ++--- 4 files changed, 174 insertions(+), 30 deletions(-) diff --git a/web/supabase/dbTypes.ts b/web/supabase/dbTypes.ts index a55af50..cab0f14 100644 --- a/web/supabase/dbTypes.ts +++ b/web/supabase/dbTypes.ts @@ -9,24 +9,157 @@ export type Json = export interface Database { public: { Tables: { + applications: { + Row: { + answers: Json | null + id: string + project_id: string + recipient: string + round: string + } + Insert: { + answers?: Json | null + id: string + project_id: string + recipient: string + round: string + } + Update: { + answers?: Json | null + id?: string + project_id?: string + recipient?: string + round?: string + } + Relationships: [ + { + foreignKeyName: "applications_project_id_fkey" + columns: ["project_id"] + isOneToOne: false + referencedRelation: "projects" + referencedColumns: ["id"] + } + ] + } + gitcoin_applications: { + Row: { + created_at: string + data: Json + id: string + pointer: string + project_id: string + protocol: number + round_id: string + } + Insert: { + created_at?: string + data: Json + id: string + pointer: string + project_id: string + protocol: number + round_id: string + } + Update: { + created_at?: string + data?: Json + id?: string + pointer?: string + project_id?: string + protocol?: number + round_id?: string + } + Relationships: [ + { + foreignKeyName: "gitcoin_applications_project_id_fkey" + columns: ["project_id"] + isOneToOne: false + referencedRelation: "gitcoin_projects" + referencedColumns: ["id"] + } + ] + } + gitcoin_indexing_jobs: { + Row: { + created_at: string + error: string | null + id: string + is_failed: boolean + is_running: boolean + last_updated_at: string + skip_projects: number + skip_rounds: number + url: string + } + Insert: { + created_at?: string + error?: string | null + id?: string + is_failed?: boolean + is_running?: boolean + last_updated_at?: string + skip_projects?: number + skip_rounds?: number + url: string + } + Update: { + created_at?: string + error?: string | null + id?: string + is_failed?: boolean + is_running?: boolean + last_updated_at?: string + skip_projects?: number + skip_rounds?: number + url?: string + } + Relationships: [] + } + gitcoin_projects: { + Row: { + created_at: string + data: Json + id: string + pointer: string + protocol: number + } + Insert: { + created_at?: string + data: Json + id: string + pointer: string + protocol: number + } + Update: { + created_at?: string + data?: Json + id?: string + pointer?: string + protocol?: number + } + Relationships: [] + } logs: { Row: { created_at: string id: string - message: string run_id: string + step: Database["public"]["Enums"]["step"] + value: string | null } Insert: { created_at?: string id?: string - message: string run_id: string + step: Database["public"]["Enums"]["step"] + value?: string | null } Update: { created_at?: string id?: string - message?: string run_id?: string + step?: Database["public"]["Enums"]["step"] + value?: string | null } Relationships: [ { @@ -42,21 +175,18 @@ export interface Database { Row: { description: string | null id: string - recipient: string | null title: string | null website: string | null } Insert: { description?: string | null - id?: string - recipient?: string | null + id: string title?: string | null website?: string | null } Update: { description?: string | null id?: string - recipient?: string | null title?: string | null website?: string | null } @@ -123,6 +253,13 @@ export interface Database { weight?: number | null } Relationships: [ + { + foreignKeyName: "strategy_entries_project_id_fkey" + columns: ["project_id"] + isOneToOne: false + referencedRelation: "projects" + referencedColumns: ["id"] + }, { foreignKeyName: "strategy_entries_run_id_fkey" columns: ["run_id"] @@ -155,7 +292,11 @@ export interface Database { [_ in never]: never } Enums: { - [_ in never]: never + step: + | "FETCH_PROJECTS" + | "EVALUATE_PROJECTS" + | "ANALYZE_FUNDING" + | "SYNTHESIZE_RESULTS" } CompositeTypes: { [_ in never]: never diff --git a/workers/fund_public_goods/api/runs.py b/workers/fund_public_goods/api/runs.py index 88a7b3a..0498513 100644 --- a/workers/fund_public_goods/api/runs.py +++ b/workers/fund_public_goods/api/runs.py @@ -31,7 +31,7 @@ async def runs(worker_id: str, params: Params) -> Response: run_id = tables.runs.insert(supabase, worker_id, prompt) await inngest_client.send( - CreateStrategyEvent.Data(prompt=prompt, run_id=run_id).to_event() + CreateStrategyEvent.Data(run_id=run_id).to_event() ) return Response(run_id=run_id) diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py index bac80f8..9358b73 100644 --- a/workers/fund_public_goods/db/tables/logs.py +++ b/workers/fund_public_goods/db/tables/logs.py @@ -1,11 +1,14 @@ +from typing import Literal from supabase import Client def insert( db: Client, run_id: str, + step: Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"], message: str ): db.table("logs").insert({ "run_id": run_id, + "step": step, "message": message }).execute() diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 397bb3b..7a6018b 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -56,30 +56,21 @@ async def create_strategy( run_id = data.run_id supabase = client.create_admin() - await step.run( - "extracting_prompt", - lambda: logs.insert(supabase, run_id, "Extracting prompt from run_id"), - ) - prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id)) - await step.run( - "fetching_projects_info", - lambda: logs.insert(supabase, run_id, "Getting information from data sources"), - ) - json_projects = await step.run( "fetch_projects_data", lambda: fetch_projects_data(supabase) ) projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore - + await step.run( - "assessing", + "fetched_projects_data", lambda: logs.insert( supabase, run_id, - "Assessing impact of each project related to the users interest", + "FETCH_PROJECTS", + f"Found {len(projects)} projects", ), ) @@ -87,13 +78,14 @@ async def create_strategy( "assess_projects", lambda: evaluate_projects(prompt, projects) ) assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore - + await step.run( - "determining_funding", + "assessed_projects", lambda: logs.insert( supabase, run_id, - "Determining the relative funding that the best matching projects need", + "EVALUATE_PROJECTS", + f"Evaluated {len(assessed_projects)} projects", ), ) @@ -101,16 +93,24 @@ async def create_strategy( "determine_funding", lambda: assign_weights(assessed_projects) ) weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore - + await step.run( - "saving_results_to_db", - lambda: logs.insert(supabase, run_id, "Generating results"), + "determined_funding", + lambda: logs.insert( + supabase, + run_id, + "ANALYZE_FUNDING", + "Determined the relative funding that the best matching projects need", + ), ) await step.run( "save_strategy_to_db", lambda: insert_multiple(supabase, run_id, weighted_projects) ) - - await step.run("result", lambda: logs.insert(supabase, run_id, "STRATEGY_CREATED")) + + await step.run( + "saved_results_to_db", + lambda: logs.insert(supabase, run_id, "SYNTHESIZE_RESULTS", "Results generated"), + ) return "done" From 6f5f5611d596062ec37450e3209a8d07767270cb Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 00:02:36 +0100 Subject: [PATCH 03/17] Changed log message -> value --- workers/fund_public_goods/db/tables/logs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py index 9358b73..71f11ec 100644 --- a/workers/fund_public_goods/db/tables/logs.py +++ b/workers/fund_public_goods/db/tables/logs.py @@ -10,5 +10,5 @@ def insert( db.table("logs").insert({ "run_id": run_id, "step": step, - "message": message + "value": message }).execute() From bc86e2b58c46f2301ca89f8206ca7bb0a58f7b4f Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 01:41:45 +0100 Subject: [PATCH 04/17] Progress page logic --- web/app/[runId]/progress/page.tsx | 13 +++ web/app/{[id] => [runId]}/strategy/page.tsx | 0 web/components/Logs.tsx | 46 +++++++++ web/components/Prompt.tsx | 44 +-------- web/components/RealtimeLogs.tsx | 102 ++++++++++++++++++++ web/supabase/dbTypes.ts | 43 +++++++-- 6 files changed, 200 insertions(+), 48 deletions(-) create mode 100644 web/app/[runId]/progress/page.tsx rename web/app/{[id] => [runId]}/strategy/page.tsx (100%) create mode 100644 web/components/Logs.tsx create mode 100644 web/components/RealtimeLogs.tsx diff --git a/web/app/[runId]/progress/page.tsx b/web/app/[runId]/progress/page.tsx new file mode 100644 index 0000000..b9ccbb8 --- /dev/null +++ b/web/app/[runId]/progress/page.tsx @@ -0,0 +1,13 @@ +import Logs from "@/components/Logs"; + +export default function ProgressPage(props: { + params: { + runId: string + } +}) { + return ( +
+ +
+ ) +} \ No newline at end of file diff --git a/web/app/[id]/strategy/page.tsx b/web/app/[runId]/strategy/page.tsx similarity index 100% rename from web/app/[id]/strategy/page.tsx rename to web/app/[runId]/strategy/page.tsx diff --git a/web/components/Logs.tsx b/web/components/Logs.tsx new file mode 100644 index 0000000..6a96170 --- /dev/null +++ b/web/components/Logs.tsx @@ -0,0 +1,46 @@ +"use server" + +import { createSupabaseServerClient } from "@/utils/supabase-server"; +import RealtimeLogs from "./RealtimeLogs"; + +export default async function Logs(props: { runId: string }) { + const supabase = createSupabaseServerClient() + + const { data: steps } = await supabase.from('steps').select(` + id, + name, + order + `) + + if (!steps) { + throw new Error(`Error fetching steps`) + } + + const { data: run } = await supabase.from('runs').select(` + id, + prompt, + logs( + id, + run_id, + created_at, + value, + step_id, + ended_at, + status, + steps( + id, + name, + order + ) + ) + `).eq("id", props.runId).single() + + + if (!run) { + throw new Error(`Run with ID '${props.runId}' not found`) + } + + return ( + + ) +} \ No newline at end of file diff --git a/web/components/Prompt.tsx b/web/components/Prompt.tsx index a0fab83..e5aaf9c 100644 --- a/web/components/Prompt.tsx +++ b/web/components/Prompt.tsx @@ -1,13 +1,10 @@ "use client"; -import { ChangeEvent, useEffect, useState } from "react"; +import { ChangeEvent, useState } from "react"; import TextField from "./TextField"; import ChatInputButton from "./ChatInputButton"; import { SparkleIcon } from "./Icons"; -import Image from "next/image"; import { startWorker } from "@/app/actions"; -import { createSupabaseBrowserClient } from "@/utils/supabase-browser"; -import { Tables } from "@/supabase/dbTypes"; import { useRouter } from "next/navigation"; import LoadingCircle from "./LoadingCircle"; @@ -24,60 +21,27 @@ const PROMPT_SUGESTIONS = [ export default function Prompt() { const [prompt, setPrompt] = useState(""); const [isWaiting, setIsWaiting] = useState(false); - const [workerId, setWorkerId] = useState(); - const [runId, setRunId] = useState(); - const [status, setStatus] = useState(); const router = useRouter(); - const supabase = createSupabaseBrowserClient(); const sendPrompt = async (prompt: string) => { setIsWaiting(true); try { const response = await startWorker(prompt); - setWorkerId(response.workerId); - setRunId(response.runId); + router.push(`/${response.runId}/progress`) } finally { setIsWaiting(false); } }; - useEffect(() => { - if (runId) { - const channel = supabase - .channel("logs-added") - .on( - "postgres_changes", - { - event: "INSERT", - table: "logs", - schema: "public", - filter: `run_id=eq.${runId}`, - }, - (payload: { new: Tables<"logs"> }) => { - if (payload.new.message === "STRATEGY_CREATED") { - router.push(`strategy/${workerId}`); - return; - } - setStatus(payload.new.message); - } - ) - .subscribe(); - - return () => { - supabase.removeChannel(channel); - }; - } - }, [workerId, supabase, runId, workerId]); - return ( <>
- {status ? ( + {isWaiting ? (
-
{status}
+
Loading
) : ( diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx new file mode 100644 index 0000000..3b0cd15 --- /dev/null +++ b/web/components/RealtimeLogs.tsx @@ -0,0 +1,102 @@ +"use client" + +import { Tables } from "@/supabase/dbTypes"; +import { createSupabaseBrowserClient } from "@/utils/supabase-browser"; +import router from "next/router"; +import { useState, useEffect } from "react"; + +type LogWithStep = (Tables<"logs"> & { steps: Tables<"steps"> | null }) + +const UNSTARTED_TEXTS: Record["name"], string> = { + FETCH_PROJECTS: "Search for relevant projects", + EVALUATE_PROJECTS: "Evaluate proof of impact", + ANALYZE_FUNDING: "Analyze funding needs", + SYNTHESIZE_RESULTS: "Synthesize results", +} + +const LOADING_TEXTS: Record["name"], string> = { + FETCH_PROJECTS: "Searching for relevant projects", + EVALUATE_PROJECTS: "Evaluating proof of impact", + ANALYZE_FUNDING: "Analyzing funding needs", + SYNTHESIZE_RESULTS: "Synthesizing results", +} + +const getLogMessage = (log: LogWithStep) => { + switch (log.status) { + case "NOT_STARTED": return UNSTARTED_TEXTS[log.steps!.name] + case "IN_PROGRESS": return LOADING_TEXTS[log.steps!.name] + case "COMPLETED": return log.value ?? `Completed: ${UNSTARTED_TEXTS[log.steps!.name]}` + case "ERRORED": return `Error while ${LOADING_TEXTS[log.steps!.name].toLowerCase()}` + } +} + +export default function RealtimeLogs(props: { + logs: LogWithStep[] + steps: Tables<"steps">[] + run: { + id: string; + prompt: string; + } +}) { + const [logs, setLogs] = useState(props.logs) + const supabase = createSupabaseBrowserClient(); + const runId = props.run + + const orderedSteps = props.steps.sort((a, b) => a.order - b.order) + const lastStep = orderedSteps.slice(-1)[0] + + const sortedLogsWithSteps = logs.sort((a, b) => { + return a.steps!.order - b.steps!.order + }) + + useEffect(() => { + const channel = supabase + .channel("logs-added") + .on( + "postgres_changes", + { + event: "UPDATE", + table: "logs", + schema: "public", + filter: `run_id=eq.${runId}`, + }, + (payload: { new: Tables<"logs"> }) => { + const logsMinusTheUpdatedOne = logs.filter(log => log.id !== payload.new.id) + const newWithStep = { + ...payload.new, + steps: props.steps.find(step => step.id === payload.new.step_id) ?? null + } + + setLogs([...logsMinusTheUpdatedOne, newWithStep]) + + if (payload.new.step_id === lastStep.id && payload.new.status === "COMPLETED") { + router.push(`/${runId}/strategy`) + return; + } + } + ) + .subscribe(); + + return () => { + supabase.removeChannel(channel); + }; + }, [supabase, runId]); + + return ( +
+
+ Results for: +
+ {props.run.prompt} +
+
+
+ { sortedLogsWithSteps.map(logWithStep => ( +
+ { getLogMessage(logWithStep) } +
+ )) } +
+
+ ) +} \ No newline at end of file diff --git a/web/supabase/dbTypes.ts b/web/supabase/dbTypes.ts index cab0f14..b655cb3 100644 --- a/web/supabase/dbTypes.ts +++ b/web/supabase/dbTypes.ts @@ -142,23 +142,29 @@ export interface Database { logs: { Row: { created_at: string + ended_at: string | null id: string run_id: string - step: Database["public"]["Enums"]["step"] + status: Database["public"]["Enums"]["step_status"] + step_id: string value: string | null } Insert: { created_at?: string + ended_at?: string | null id?: string run_id: string - step: Database["public"]["Enums"]["step"] + status: Database["public"]["Enums"]["step_status"] + step_id: string value?: string | null } Update: { created_at?: string + ended_at?: string | null id?: string run_id?: string - step?: Database["public"]["Enums"]["step"] + status?: Database["public"]["Enums"]["step_status"] + step_id?: string value?: string | null } Relationships: [ @@ -168,6 +174,13 @@ export interface Database { isOneToOne: false referencedRelation: "runs" referencedColumns: ["id"] + }, + { + foreignKeyName: "logs_step_id_fkey" + columns: ["step_id"] + isOneToOne: false + referencedRelation: "steps" + referencedColumns: ["id"] } ] } @@ -221,6 +234,24 @@ export interface Database { } ] } + steps: { + Row: { + id: string + name: string + order: number + } + Insert: { + id?: string + name: string + order: number + } + Update: { + id?: string + name?: string + order?: number + } + Relationships: [] + } strategy_entries: { Row: { created_at: string @@ -292,11 +323,7 @@ export interface Database { [_ in never]: never } Enums: { - step: - | "FETCH_PROJECTS" - | "EVALUATE_PROJECTS" - | "ANALYZE_FUNDING" - | "SYNTHESIZE_RESULTS" + step_status: "NOT_STARTED" | "IN_PROGRESS" | "COMPLETED" | "ERRORED" } CompositeTypes: { [_ in never]: never From 66fee471c65f81710dba43018a2a909f34d7c756 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 03:08:17 +0100 Subject: [PATCH 05/17] Simplified DB schema for step names --- web/components/Logs.tsx | 19 ++------------- web/components/RealtimeLogs.tsx | 42 +++++++++++++++------------------ web/supabase/dbTypes.ts | 36 +++++++--------------------- 3 files changed, 29 insertions(+), 68 deletions(-) diff --git a/web/components/Logs.tsx b/web/components/Logs.tsx index 6a96170..80b2c47 100644 --- a/web/components/Logs.tsx +++ b/web/components/Logs.tsx @@ -6,16 +6,6 @@ import RealtimeLogs from "./RealtimeLogs"; export default async function Logs(props: { runId: string }) { const supabase = createSupabaseServerClient() - const { data: steps } = await supabase.from('steps').select(` - id, - name, - order - `) - - if (!steps) { - throw new Error(`Error fetching steps`) - } - const { data: run } = await supabase.from('runs').select(` id, prompt, @@ -24,14 +14,9 @@ export default async function Logs(props: { runId: string }) { run_id, created_at, value, - step_id, ended_at, status, - steps( - id, - name, - order - ) + step_name ) `).eq("id", props.runId).single() @@ -41,6 +26,6 @@ export default async function Logs(props: { runId: string }) { } return ( - + ) } \ No newline at end of file diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx index 3b0cd15..3fe6285 100644 --- a/web/components/RealtimeLogs.tsx +++ b/web/components/RealtimeLogs.tsx @@ -5,48 +5,49 @@ import { createSupabaseBrowserClient } from "@/utils/supabase-browser"; import router from "next/router"; import { useState, useEffect } from "react"; -type LogWithStep = (Tables<"logs"> & { steps: Tables<"steps"> | null }) - -const UNSTARTED_TEXTS: Record["name"], string> = { +const UNSTARTED_TEXTS: Record["step_name"], string> = { FETCH_PROJECTS: "Search for relevant projects", EVALUATE_PROJECTS: "Evaluate proof of impact", ANALYZE_FUNDING: "Analyze funding needs", SYNTHESIZE_RESULTS: "Synthesize results", } -const LOADING_TEXTS: Record["name"], string> = { +const LOADING_TEXTS: Record["step_name"], string> = { FETCH_PROJECTS: "Searching for relevant projects", EVALUATE_PROJECTS: "Evaluating proof of impact", ANALYZE_FUNDING: "Analyzing funding needs", SYNTHESIZE_RESULTS: "Synthesizing results", } -const getLogMessage = (log: LogWithStep) => { +const STEPS_ORDER: Record["step_name"], number> = { + FETCH_PROJECTS: 1, + EVALUATE_PROJECTS: 2, + ANALYZE_FUNDING: 3, + SYNTHESIZE_RESULTS: 4, +} + +const getLogMessage = (log: Tables<"logs">) => { switch (log.status) { - case "NOT_STARTED": return UNSTARTED_TEXTS[log.steps!.name] - case "IN_PROGRESS": return LOADING_TEXTS[log.steps!.name] - case "COMPLETED": return log.value ?? `Completed: ${UNSTARTED_TEXTS[log.steps!.name]}` - case "ERRORED": return `Error while ${LOADING_TEXTS[log.steps!.name].toLowerCase()}` + case "NOT_STARTED": return UNSTARTED_TEXTS[log.step_name] + case "IN_PROGRESS": return LOADING_TEXTS[log.step_name] + case "COMPLETED": return log.value ?? `Completed: ${UNSTARTED_TEXTS[log.step_name]}` + case "ERRORED": return `Error while ${LOADING_TEXTS[log.step_name].toLowerCase()}` } } export default function RealtimeLogs(props: { - logs: LogWithStep[] - steps: Tables<"steps">[] + logs: Tables<"logs">[] run: { id: string; prompt: string; } }) { - const [logs, setLogs] = useState(props.logs) + const [logs, setLogs] = useState[]>(props.logs) const supabase = createSupabaseBrowserClient(); const runId = props.run - - const orderedSteps = props.steps.sort((a, b) => a.order - b.order) - const lastStep = orderedSteps.slice(-1)[0] const sortedLogsWithSteps = logs.sort((a, b) => { - return a.steps!.order - b.steps!.order + return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name] }) useEffect(() => { @@ -62,14 +63,9 @@ export default function RealtimeLogs(props: { }, (payload: { new: Tables<"logs"> }) => { const logsMinusTheUpdatedOne = logs.filter(log => log.id !== payload.new.id) - const newWithStep = { - ...payload.new, - steps: props.steps.find(step => step.id === payload.new.step_id) ?? null - } - - setLogs([...logsMinusTheUpdatedOne, newWithStep]) + setLogs([...logsMinusTheUpdatedOne, payload.new]) - if (payload.new.step_id === lastStep.id && payload.new.status === "COMPLETED") { + if (payload.new.step_name === "SYNTHESIZE_RESULTS" && payload.new.status === "COMPLETED") { router.push(`/${runId}/strategy`) return; } diff --git a/web/supabase/dbTypes.ts b/web/supabase/dbTypes.ts index b655cb3..4003c34 100644 --- a/web/supabase/dbTypes.ts +++ b/web/supabase/dbTypes.ts @@ -146,7 +146,7 @@ export interface Database { id: string run_id: string status: Database["public"]["Enums"]["step_status"] - step_id: string + step_name: Database["public"]["Enums"]["step_name"] value: string | null } Insert: { @@ -155,7 +155,7 @@ export interface Database { id?: string run_id: string status: Database["public"]["Enums"]["step_status"] - step_id: string + step_name: Database["public"]["Enums"]["step_name"] value?: string | null } Update: { @@ -164,7 +164,7 @@ export interface Database { id?: string run_id?: string status?: Database["public"]["Enums"]["step_status"] - step_id?: string + step_name?: Database["public"]["Enums"]["step_name"] value?: string | null } Relationships: [ @@ -174,13 +174,6 @@ export interface Database { isOneToOne: false referencedRelation: "runs" referencedColumns: ["id"] - }, - { - foreignKeyName: "logs_step_id_fkey" - columns: ["step_id"] - isOneToOne: false - referencedRelation: "steps" - referencedColumns: ["id"] } ] } @@ -234,24 +227,6 @@ export interface Database { } ] } - steps: { - Row: { - id: string - name: string - order: number - } - Insert: { - id?: string - name: string - order: number - } - Update: { - id?: string - name?: string - order?: number - } - Relationships: [] - } strategy_entries: { Row: { created_at: string @@ -323,6 +298,11 @@ export interface Database { [_ in never]: never } Enums: { + step_name: + | "FETCH_PROJECTS" + | "EVALUATE_PROJECTS" + | "ANALYZE_FUNDING" + | "SYNTHESIZE_RESULTS" step_status: "NOT_STARTED" | "IN_PROGRESS" | "COMPLETED" | "ERRORED" } CompositeTypes: { From 88d85782aa6bf8655137cd43cf6aa0c75435a62c Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 03:26:18 +0100 Subject: [PATCH 06/17] Updated logging according to new schema --- workers/fund_public_goods/db/tables/logs.py | 27 +++- .../workers/functions/create_strategy.py | 126 ++++++++++++++---- 2 files changed, 118 insertions(+), 35 deletions(-) diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py index 71f11ec..93e9967 100644 --- a/workers/fund_public_goods/db/tables/logs.py +++ b/workers/fund_public_goods/db/tables/logs.py @@ -1,14 +1,29 @@ from typing import Literal from supabase import Client +import datetime -def insert( +def create( db: Client, run_id: str, - step: Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"], - message: str + step_name: str, ): - db.table("logs").insert({ + return db.table("logs").insert({ "run_id": run_id, - "step": step, - "value": message + "step_name": step_name, }).execute() + +def update( + db: Client, + log_id: str, + status: Literal["IN_PROGRESS", "COMPLETED", "ERRORED"], + value: str | None +): + ended_at = None + if status == "COMPLETED" or status == "ERRORED": + ended_at = datetime.datetime.now().isoformat() + + return db.table("logs").update({ + "status": status, + "value": value, + "ended_at": ended_at + }).eq("id", log_id).execute() \ No newline at end of file diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 7a6018b..9bb18f0 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -2,11 +2,13 @@ from fund_public_goods.agents.researcher.functions.evaluate_projects import ( evaluate_projects, ) +from fund_public_goods.agents.researcher.models.answer import Answer from fund_public_goods.agents.researcher.models.evaluated_project import ( EvaluatedProject, ) from fund_public_goods.agents.researcher.models.project import Project from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject +from fund_public_goods.db.tables.steps import get_steps import inngest from fund_public_goods.db.tables.projects import get_projects from fund_public_goods.db.tables.runs import get_prompt @@ -14,24 +16,28 @@ from fund_public_goods.workers.events import CreateStrategyEvent from fund_public_goods.db import client, logs from supabase import Client +import typing + + +StepNames = typing.Literal["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] +step_names: list[StepNames] = ["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] def fetch_projects_data(supabase: Client) -> list[Project]: response = get_projects(supabase) - projects = [] + + projects: list[Project] = [] for item in response.data: - answers = [] + answers: list[Answer] = [] for application in item.get("applications", []): for answer in application.get("answers", []): - answers.append( - { - "question": answer.get("question", ""), - "answer": answer.get("answer", None), - } - ) - + answers.append(Answer( + question=answer.get("question", ""), + answer=answer.get("answer", None) + )) + project = Project( id=item.get("id", ""), title=item.get("title", ""), @@ -39,11 +45,26 @@ def fetch_projects_data(supabase: Client) -> list[Project]: website=item.get("website", ""), answers=answers, ) + projects.append(project) return projects +def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]: + log_ids: dict[StepNames, str] = {} + + for step_name in step_names: + new_log = logs.create( + db=supabase, + run_id=run_id, + step_name=step_name, + ).data + + log_ids[step_name] = new_log[0].id + + return log_ids + @inngest.create_function( fn_id="on_create_strategy", trigger=CreateStrategyEvent.trigger, @@ -55,22 +76,44 @@ async def create_strategy( data = CreateStrategyEvent.Data.model_validate(ctx.event.data) run_id = data.run_id supabase = client.create_admin() - + prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id)) + + log_ids = await step.run("initialize_logs", lambda: initialize_logs(supabase, run_id)) + + await step.run( + "start_fetch_projects_data", + lambda: logs.update( + db=supabase, + status="IN_PROGRESS", + log_id=log_ids["FETCH_PROJECTS"], + value=None, + ), + ) json_projects = await step.run( "fetch_projects_data", lambda: fetch_projects_data(supabase) ) - projects: list[Project] = [Project(**json_project) for json_project in json_projects] # type: ignore + projects = [Project(**json_project) for json_project in json_projects] # type: ignore await step.run( - "fetched_projects_data", - lambda: logs.insert( - supabase, - run_id, - "FETCH_PROJECTS", - f"Found {len(projects)} projects", + "completed_fetch_projects_data", + lambda: logs.update( + db=supabase, + status="COMPLETED", + log_id=log_ids["FETCH_PROJECTS"], + value=f"Found {len(projects)} projects", + ), + ) + + await step.run( + "start_assess_projects", + lambda: logs.update( + db=supabase, + status="IN_PROGRESS", + log_id=log_ids["EVALUATE_PROJECTS"], + value=None, ), ) @@ -80,12 +123,22 @@ async def create_strategy( assessed_projects = [EvaluatedProject(**x) for x in json_asessed_projects] # type: ignore await step.run( - "assessed_projects", - lambda: logs.insert( + "completed_assess_projects", + lambda: logs.update( + db=supabase, + status="COMPLETED", + log_id=log_ids["EVALUATE_PROJECTS"], + value=f"Evaluated {len(assessed_projects)} projects", + ), + ) + + await step.run( + "start_determine_funding", + lambda: logs.update( supabase, - run_id, - "EVALUATE_PROJECTS", - f"Evaluated {len(assessed_projects)} projects", + status="IN_PROGRESS", + log_id=log_ids["ANALYZE_FUNDING"], + value=None, ), ) @@ -95,12 +148,22 @@ async def create_strategy( weighted_projects = [WeightedProject(**x) for x in json_weighted_projects] # type: ignore await step.run( - "determined_funding", - lambda: logs.insert( + "completed_determine_funding", + lambda: logs.update( supabase, - run_id, - "ANALYZE_FUNDING", - "Determined the relative funding that the best matching projects need", + status="COMPLETED", + log_id=log_ids["ANALYZE_FUNDING"], + value="Determined the relative funding that the best matching projects need", + ), + ) + + await step.run( + "start_synthesize_results", + lambda: logs.update( + supabase, + status="IN_PROGRESS", + log_id=log_ids["SYNTHESIZE_RESULTS"], + value=None ), ) @@ -109,8 +172,13 @@ async def create_strategy( ) await step.run( - "saved_results_to_db", - lambda: logs.insert(supabase, run_id, "SYNTHESIZE_RESULTS", "Results generated"), + "completed_synthesize_results", + lambda: logs.update( + supabase, + status="COMPLETED", + log_id=log_ids["SYNTHESIZE_RESULTS"], + value="Results generated" + ), ) return "done" From c940e88bd9712c2449d137b9d85934190376f179 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 16:53:20 +0100 Subject: [PATCH 07/17] Bugfixes --- workers/fund_public_goods/db/tables/logs.py | 1 + workers/fund_public_goods/db/tables/runs.py | 2 +- .../workers/functions/create_strategy.py | 8 +++----- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/workers/fund_public_goods/db/tables/logs.py b/workers/fund_public_goods/db/tables/logs.py index 93e9967..f214527 100644 --- a/workers/fund_public_goods/db/tables/logs.py +++ b/workers/fund_public_goods/db/tables/logs.py @@ -10,6 +10,7 @@ def create( return db.table("logs").insert({ "run_id": run_id, "step_name": step_name, + "status": "NOT_STARTED" }).execute() def update( diff --git a/workers/fund_public_goods/db/tables/runs.py b/workers/fund_public_goods/db/tables/runs.py index caa9f7a..f71d888 100644 --- a/workers/fund_public_goods/db/tables/runs.py +++ b/workers/fund_public_goods/db/tables/runs.py @@ -19,5 +19,5 @@ def get_prompt(db: Client, run_id: str) -> str: .limit(1) .single() .execute() - .data + .data["prompt"] ) diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 9bb18f0..70d8bcb 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -8,7 +8,6 @@ ) from fund_public_goods.agents.researcher.models.project import Project from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject -from fund_public_goods.db.tables.steps import get_steps import inngest from fund_public_goods.db.tables.projects import get_projects from fund_public_goods.db.tables.runs import get_prompt @@ -61,7 +60,7 @@ def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]: step_name=step_name, ).data - log_ids[step_name] = new_log[0].id + log_ids[step_name] = new_log[0]["id"] return log_ids @@ -77,9 +76,8 @@ async def create_strategy( run_id = data.run_id supabase = client.create_admin() - prompt = await step.run("extract_prompt", lambda: get_prompt(supabase, run_id)) - - log_ids = await step.run("initialize_logs", lambda: initialize_logs(supabase, run_id)) + prompt = get_prompt(supabase, run_id) + log_ids = initialize_logs(supabase, run_id) await step.run( "start_fetch_projects_data", From ba7d024cca4e73e607bcddde85235cc0836fd6f0 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 18:05:57 +0100 Subject: [PATCH 08/17] Max response size bug solved --- .../researcher/functions/assign_weights.py | 2 +- .../researcher/functions/evaluate_projects.py | 68 +------------------ .../researcher/functions/generate_queries.py | 1 + .../functions/get_top_matching_projects.py | 38 +++++++++++ .../agents/researcher/utils/__init__.py | 0 .../agents/researcher/utils/projects.py | 33 +++++++++ .../fund_public_goods/db/tables/projects.py | 29 ++++++++ .../workers/functions/create_strategy.py | 38 +++-------- 8 files changed, 113 insertions(+), 96 deletions(-) create mode 100644 workers/fund_public_goods/agents/researcher/functions/get_top_matching_projects.py create mode 100644 workers/fund_public_goods/agents/researcher/utils/__init__.py create mode 100644 workers/fund_public_goods/agents/researcher/utils/projects.py diff --git a/workers/fund_public_goods/agents/researcher/functions/assign_weights.py b/workers/fund_public_goods/agents/researcher/functions/assign_weights.py index 2f9bb64..25247bd 100644 --- a/workers/fund_public_goods/agents/researcher/functions/assign_weights.py +++ b/workers/fund_public_goods/agents/researcher/functions/assign_weights.py @@ -3,9 +3,9 @@ from langchain_core.output_parsers import StrOutputParser from langchain_core.output_parsers.json import JsonOutputParser from fund_public_goods.agents.researcher.models.evaluated_project import EvaluatedProject +from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject import json -from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject assign_weights_prompt_template = """ You are a specialist in public goods funding. You will receive a list of projects diff --git a/workers/fund_public_goods/agents/researcher/functions/evaluate_projects.py b/workers/fund_public_goods/agents/researcher/functions/evaluate_projects.py index 2f120d1..6f5583b 100644 --- a/workers/fund_public_goods/agents/researcher/functions/evaluate_projects.py +++ b/workers/fund_public_goods/agents/researcher/functions/evaluate_projects.py @@ -1,76 +1,13 @@ -from chromadb import EphemeralClient +from fund_public_goods.agents.researcher.utils.projects import stringify_projects from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.output_parsers.json import JsonOutputParser -from langchain_openai import OpenAIEmbeddings -from fund_public_goods.agents.researcher.functions.generate_queries import generate_queries from fund_public_goods.agents.researcher.models.evaluated_project import EvaluatedProject from fund_public_goods.agents.researcher.models.project import Project -from langchain.vectorstores.chroma import Chroma from fund_public_goods.agents.researcher.models.project_evaluation import ProjectEvaluation -def stringify_projects(projects: list[Project], separator: str) -> str: - project_strings = [] - - for project in projects: - project_str = get_project_text(project=project) - project_strings.append(project_str) - - return separator.join(project_strings) - -def get_project_text(project: Project) -> str: - result = f"ID: {project.id} - Description: {project.description}\n" - - for answer in project.answers: - result += f" Question: {answer.question}\n" - result += f" Answer: {answer.answer}\n" - - return result - -def remove_duplicate_projects(projects: list[Project]) -> list[Project]: - seen = {} - unique_projects = [] - - for project in projects: - if project.id not in seen: - unique_projects.append(project) - seen[project.id] = True - - return unique_projects - -def get_top_matching_projects(prompt: str, projects: list[Project]) -> list[Project]: - projects_by_id = {project.id: project for project in projects} - queries = generate_queries(prompt=prompt, n=3) - texts: list[str] = [] - metadatas: list[dict] = [] - - for project in projects: - project_text = get_project_text(project=project) - texts.append(project_text) - metadatas.append({ "id": project["id"] }) - - db_client = EphemeralClient() - collection = Chroma.from_texts( - texts=texts, - metadatas=metadatas, - embedding=OpenAIEmbeddings(), - client=db_client, - collection_name="projects" - ) - - top_matches: list[Project] = [] - - for query in queries: - matches = collection.similarity_search(query, k=5) - - for match in matches: - matched_project = projects_by_id[match.metadata["id"]] - top_matches.append(matched_project) - - return remove_duplicate_projects(top_matches) - extract_evaluations_prompts_template = """ You will be given a list of project evaluations that measure how well each project matches the user's interest, and its impact in regards to that interest. @@ -139,7 +76,6 @@ def extract_project_evaluations(evaluation_report: str) -> list[ProjectEvaluatio def evaluate_projects(prompt: str, projects: list[Project]) -> list[EvaluatedProject]: projects_by_id = {project.id: project for project in projects} - top_matching_projects = get_top_matching_projects(prompt=prompt, projects=projects) evaluation_prompt = ChatPromptTemplate.from_messages([ ("system", evaluation_prompt_template), @@ -154,7 +90,7 @@ def evaluate_projects(prompt: str, projects: list[Project]) -> list[EvaluatedPro evaluation_report = evaluation_chain.invoke({ "prompt": prompt, "separator": separator, - "projects": stringify_projects(projects=top_matching_projects, separator=separator) + "projects": stringify_projects(projects=projects, separator=separator) }) evaluations = extract_project_evaluations(evaluation_report=evaluation_report) diff --git a/workers/fund_public_goods/agents/researcher/functions/generate_queries.py b/workers/fund_public_goods/agents/researcher/functions/generate_queries.py index bbd62ce..1d3c0c7 100644 --- a/workers/fund_public_goods/agents/researcher/functions/generate_queries.py +++ b/workers/fund_public_goods/agents/researcher/functions/generate_queries.py @@ -2,6 +2,7 @@ from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import CommaSeparatedListOutputParser + queries_prompt_template = """ Your goal is to provide a list of queries that will be used to perform and embeddings search over different project descriptions and get the ones diff --git a/workers/fund_public_goods/agents/researcher/functions/get_top_matching_projects.py b/workers/fund_public_goods/agents/researcher/functions/get_top_matching_projects.py new file mode 100644 index 0000000..7cc3266 --- /dev/null +++ b/workers/fund_public_goods/agents/researcher/functions/get_top_matching_projects.py @@ -0,0 +1,38 @@ +from chromadb import EphemeralClient +from fund_public_goods.agents.researcher.functions.generate_queries import generate_queries +from fund_public_goods.agents.researcher.models.project import Project +from fund_public_goods.agents.researcher.utils.projects import get_project_text, remove_duplicate_projects +from langchain_openai import OpenAIEmbeddings +from langchain.vectorstores.chroma import Chroma + + +def get_top_matching_projects(prompt: str, projects: list[Project]) -> list[Project]: + projects_by_id = {project.id: project for project in projects} + queries = generate_queries(prompt=prompt, n=3) + texts: list[str] = [] + metadatas: list[dict] = [] + + for project in projects: + project_text = get_project_text(project=project) + texts.append(project_text) + metadatas.append({ "id": project["id"] }) + + db_client = EphemeralClient() + collection = Chroma.from_texts( + texts=texts, + metadatas=metadatas, + embedding=OpenAIEmbeddings(), + client=db_client, + collection_name="projects" + ) + + top_matches: list[Project] = [] + + for query in queries: + matches = collection.similarity_search(query, k=5) + + for match in matches: + matched_project = projects_by_id[match.metadata["id"]] + top_matches.append(matched_project) + + return remove_duplicate_projects(top_matches) \ No newline at end of file diff --git a/workers/fund_public_goods/agents/researcher/utils/__init__.py b/workers/fund_public_goods/agents/researcher/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/fund_public_goods/agents/researcher/utils/projects.py b/workers/fund_public_goods/agents/researcher/utils/projects.py new file mode 100644 index 0000000..36a9d68 --- /dev/null +++ b/workers/fund_public_goods/agents/researcher/utils/projects.py @@ -0,0 +1,33 @@ +from fund_public_goods.agents.researcher.models.project import Project + + +def stringify_projects(projects: list[Project], separator: str) -> str: + project_strings = [] + + for project in projects: + project_str = get_project_text(project=project) + project_strings.append(project_str) + + return separator.join(project_strings) + + +def get_project_text(project: Project) -> str: + result = f"ID: {project.id} - Description: {project.description}\n" + + for answer in project.answers: + result += f" Question: {answer.question}\n" + result += f" Answer: {answer.answer}\n" + + return result + + +def remove_duplicate_projects(projects: list[Project]) -> list[Project]: + seen = {} + unique_projects = [] + + for project in projects: + if project.id not in seen: + unique_projects.append(project) + seen[project.id] = True + + return unique_projects \ No newline at end of file diff --git a/workers/fund_public_goods/db/tables/projects.py b/workers/fund_public_goods/db/tables/projects.py index dbd9b04..7e339d4 100644 --- a/workers/fund_public_goods/db/tables/projects.py +++ b/workers/fund_public_goods/db/tables/projects.py @@ -1,4 +1,6 @@ from typing import Any, Dict +from fund_public_goods.agents.researcher.models.answer import Answer +from fund_public_goods.agents.researcher.models.project import Project from supabase import Client, PostgrestAPIResponse import uuid @@ -27,3 +29,30 @@ def get_projects(db: Client) -> PostgrestAPIResponse[Dict[str, Any]]: ) .execute() ) + +def fetch_projects_data(supabase: Client) -> list[Project]: + response = get_projects(supabase) + + projects: list[Project] = [] + + for item in response.data: + answers: list[Answer] = [] + + for application in item.get("applications", []): + for answer in application.get("answers", []): + answers.append(Answer( + question=answer.get("question", ""), + answer=answer.get("answer", None) + )) + + project = Project( + id=item.get("id", ""), + title=item.get("title", ""), + description=item.get("description", ""), + website=item.get("website", ""), + answers=answers, + ) + + projects.append(project) + + return projects \ No newline at end of file diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 70d8bcb..9f96e3b 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -2,6 +2,7 @@ from fund_public_goods.agents.researcher.functions.evaluate_projects import ( evaluate_projects, ) +from fund_public_goods.agents.researcher.functions.get_top_matching_projects import get_top_matching_projects from fund_public_goods.agents.researcher.models.answer import Answer from fund_public_goods.agents.researcher.models.evaluated_project import ( EvaluatedProject, @@ -9,7 +10,7 @@ from fund_public_goods.agents.researcher.models.project import Project from fund_public_goods.agents.researcher.models.weighted_project import WeightedProject import inngest -from fund_public_goods.db.tables.projects import get_projects +from fund_public_goods.db.tables.projects import fetch_projects_data from fund_public_goods.db.tables.runs import get_prompt from fund_public_goods.db.tables.strategy_entries import insert_multiple from fund_public_goods.workers.events import CreateStrategyEvent @@ -22,32 +23,11 @@ step_names: list[StepNames] = ["FETCH_PROJECTS", "EVALUATE_PROJECTS", "ANALYZE_FUNDING", "SYNTHESIZE_RESULTS"] -def fetch_projects_data(supabase: Client) -> list[Project]: - response = get_projects(supabase) +def fetch_matching_projects(supabase: Client, prompt: str): + projects = fetch_projects_data(supabase) + matching_projects = get_top_matching_projects(prompt, projects) - projects: list[Project] = [] - - for item in response.data: - answers: list[Answer] = [] - - for application in item.get("applications", []): - for answer in application.get("answers", []): - answers.append(Answer( - question=answer.get("question", ""), - answer=answer.get("answer", None) - )) - - project = Project( - id=item.get("id", ""), - title=item.get("title", ""), - description=item.get("description", ""), - website=item.get("website", ""), - answers=answers, - ) - - projects.append(project) - - return projects + return [project.model_dump() for project in matching_projects] def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]: @@ -90,10 +70,10 @@ async def create_strategy( ) json_projects = await step.run( - "fetch_projects_data", lambda: fetch_projects_data(supabase) + "fetch_projects_data", lambda: fetch_matching_projects(supabase, prompt) ) - projects = [Project(**json_project) for json_project in json_projects] # type: ignore + projects = [Project(**json_project) for json_project in json_projects] await step.run( "completed_fetch_projects_data", @@ -101,7 +81,7 @@ async def create_strategy( db=supabase, status="COMPLETED", log_id=log_ids["FETCH_PROJECTS"], - value=f"Found {len(projects)} projects", + value=f"Found {len(projects)} projects related to '{prompt}'", ), ) From 599369053459321bd535c7e8358206075f31591f Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 18:48:50 +0100 Subject: [PATCH 09/17] All functions in steps to avoid repetitions --- .../workers/functions/create_strategy.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/workers/fund_public_goods/workers/functions/create_strategy.py b/workers/fund_public_goods/workers/functions/create_strategy.py index 9f96e3b..58ef687 100644 --- a/workers/fund_public_goods/workers/functions/create_strategy.py +++ b/workers/fund_public_goods/workers/functions/create_strategy.py @@ -1,3 +1,4 @@ +import json from fund_public_goods.agents.researcher.functions.assign_weights import assign_weights from fund_public_goods.agents.researcher.functions.evaluate_projects import ( evaluate_projects, @@ -30,7 +31,7 @@ def fetch_matching_projects(supabase: Client, prompt: str): return [project.model_dump() for project in matching_projects] -def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]: +def initialize_logs(supabase: Client, run_id: str) -> str: log_ids: dict[StepNames, str] = {} for step_name in step_names: @@ -42,7 +43,7 @@ def initialize_logs(supabase: Client, run_id: str) -> dict[StepNames, str]: log_ids[step_name] = new_log[0]["id"] - return log_ids + return json.dumps(log_ids) @inngest.create_function( fn_id="on_create_strategy", @@ -56,8 +57,17 @@ async def create_strategy( run_id = data.run_id supabase = client.create_admin() - prompt = get_prompt(supabase, run_id) - log_ids = initialize_logs(supabase, run_id) + prompt = await step.run( + "extract_prompt", + lambda: get_prompt(supabase, run_id), + ) + + log_ids_str = await step.run( + "initialize_logs", + lambda: initialize_logs(supabase, run_id), + ) + + log_ids: dict[StepNames, str] = json.loads(log_ids_str) await step.run( "start_fetch_projects_data", From 3b190b54fe7e1b4d0a2f803c06f6ed1e204e8a11 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 20:27:31 +0100 Subject: [PATCH 10/17] Merge conflict fixes --- workers/fund_public_goods/db/tables/projects.py | 4 ++-- workers/fund_public_goods/lib/strategy/utils/utils.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/workers/fund_public_goods/db/tables/projects.py b/workers/fund_public_goods/db/tables/projects.py index 7e339d4..3b32c98 100644 --- a/workers/fund_public_goods/db/tables/projects.py +++ b/workers/fund_public_goods/db/tables/projects.py @@ -1,6 +1,6 @@ from typing import Any, Dict -from fund_public_goods.agents.researcher.models.answer import Answer -from fund_public_goods.agents.researcher.models.project import Project +from fund_public_goods.lib.strategy.models.answer import Answer +from fund_public_goods.lib.strategy.models.project import Project from supabase import Client, PostgrestAPIResponse import uuid diff --git a/workers/fund_public_goods/lib/strategy/utils/utils.py b/workers/fund_public_goods/lib/strategy/utils/utils.py index 36a9d68..4cd7a31 100644 --- a/workers/fund_public_goods/lib/strategy/utils/utils.py +++ b/workers/fund_public_goods/lib/strategy/utils/utils.py @@ -1,4 +1,4 @@ -from fund_public_goods.agents.researcher.models.project import Project +from fund_public_goods.lib.strategy.models.project import Project def stringify_projects(projects: list[Project], separator: str) -> str: From 08839b116cf1649a066a72d4b80052552923d7f7 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Thu, 25 Jan 2024 23:40:28 +0100 Subject: [PATCH 11/17] Progress page styling --- web/app/[runId]/progress/page.tsx | 2 +- web/components/LoadingCircle.tsx | 5 ++- web/components/RealtimeLogs.tsx | 67 +++++++++++++++++++++---------- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/web/app/[runId]/progress/page.tsx b/web/app/[runId]/progress/page.tsx index b9ccbb8..2be3fa7 100644 --- a/web/app/[runId]/progress/page.tsx +++ b/web/app/[runId]/progress/page.tsx @@ -6,7 +6,7 @@ export default function ProgressPage(props: { } }) { return ( -
+
) diff --git a/web/components/LoadingCircle.tsx b/web/components/LoadingCircle.tsx index 86ce915..c36a717 100644 --- a/web/components/LoadingCircle.tsx +++ b/web/components/LoadingCircle.tsx @@ -3,9 +3,10 @@ import clsx from "clsx"; interface LoadingCircleProps { strokeWidth?: number; className?: string; + hideText?: boolean } -const LoadingCircle = ({ strokeWidth = 12, className }: LoadingCircleProps) => { +const LoadingCircle = ({ strokeWidth = 12, className, hideText }: LoadingCircleProps) => { return (
{ strokeLinecap="round" /> - Loading... + { hideText ? <> : Loading... }
); }; diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx index 3fe6285..87ec274 100644 --- a/web/components/RealtimeLogs.tsx +++ b/web/components/RealtimeLogs.tsx @@ -2,8 +2,10 @@ import { Tables } from "@/supabase/dbTypes"; import { createSupabaseBrowserClient } from "@/utils/supabase-browser"; -import router from "next/router"; +import clsx from "clsx"; +import { useRouter } from "next/navigation"; import { useState, useEffect } from "react"; +import LoadingCircle from "./LoadingCircle"; const UNSTARTED_TEXTS: Record["step_name"], string> = { FETCH_PROJECTS: "Search for relevant projects", @@ -13,10 +15,10 @@ const UNSTARTED_TEXTS: Record["step_name"], string> = { } const LOADING_TEXTS: Record["step_name"], string> = { - FETCH_PROJECTS: "Searching for relevant projects", - EVALUATE_PROJECTS: "Evaluating proof of impact", - ANALYZE_FUNDING: "Analyzing funding needs", - SYNTHESIZE_RESULTS: "Synthesizing results", + FETCH_PROJECTS: "Searching for relevant projects...", + EVALUATE_PROJECTS: "Evaluating proof of impact...", + ANALYZE_FUNDING: "Analyzing funding needs...", + SYNTHESIZE_RESULTS: "Synthesizing results...", } const STEPS_ORDER: Record["step_name"], number> = { @@ -44,7 +46,7 @@ export default function RealtimeLogs(props: { }) { const [logs, setLogs] = useState[]>(props.logs) const supabase = createSupabaseBrowserClient(); - const runId = props.run + const router = useRouter() const sortedLogsWithSteps = logs.sort((a, b) => { return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name] @@ -59,39 +61,60 @@ export default function RealtimeLogs(props: { event: "UPDATE", table: "logs", schema: "public", - filter: `run_id=eq.${runId}`, + filter: `run_id=eq.${props.run.id}`, }, (payload: { new: Tables<"logs"> }) => { - const logsMinusTheUpdatedOne = logs.filter(log => log.id !== payload.new.id) - setLogs([...logsMinusTheUpdatedOne, payload.new]) + const updatedLogs = logs.map(log => { + if (log.id === payload.new.id) { + log = payload.new + } + + return log + }) + setLogs([...updatedLogs]) if (payload.new.step_name === "SYNTHESIZE_RESULTS" && payload.new.status === "COMPLETED") { - router.push(`/${runId}/strategy`) + router.push(`/${props.run.id}/strategy`) return; } } ) - .subscribe(); + .subscribe() return () => { supabase.removeChannel(channel); }; - }, [supabase, runId]); + }, [supabase, props.run.id]); return ( -
-
- Results for: -
+
+
+

Results for:

+
{props.run.prompt}
-
- { sortedLogsWithSteps.map(logWithStep => ( -
- { getLogMessage(logWithStep) } -
- )) } +
+
+

Results:

+
+ { sortedLogsWithSteps.map(log => ( +
+ { log.status === "IN_PROGRESS" ? : <>} +

{ getLogMessage(log) }

+
+
+ )) } +
) From 43fe89900e28ad9804e1cf656dc648db884f8c8d Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Fri, 26 Jan 2024 00:40:06 +0100 Subject: [PATCH 12/17] Progress page working --- .../strategy => strategy/[id]}/page.tsx | 5 +- .../[id]}/progress/page.tsx | 4 +- web/components/Logs.tsx | 1 - web/components/RealtimeLogs.tsx | 60 ++++++++++++++----- 4 files changed, 49 insertions(+), 21 deletions(-) rename web/app/{[runId]/strategy => strategy/[id]}/page.tsx (88%) rename web/app/{[runId] => strategy/[id]}/progress/page.tsx (76%) diff --git a/web/app/[runId]/strategy/page.tsx b/web/app/strategy/[id]/page.tsx similarity index 88% rename from web/app/[runId]/strategy/page.tsx rename to web/app/strategy/[id]/page.tsx index 1184c5b..d25a590 100644 --- a/web/app/[runId]/strategy/page.tsx +++ b/web/app/strategy/[id]/page.tsx @@ -7,7 +7,6 @@ export default async function StrategyPage({ }: { params: { id: string }; }) { - const workerId = params.id; const supabase = createSupabaseServerClient(); // // Fetch the runs for this worker @@ -25,13 +24,13 @@ export default async function StrategyPage({ ) ` ) - .eq("worker_id", workerId) + .eq("id", params.id) .order("created_at", { ascending: false }) .single(); if (runs.error || !runs.data) { console.error(runs.error); - throw Error(`Runs with worker_id ${workerId} not found.`); + throw Error(`Runs with id ${params.id} not found.`); } const data = runs.data.strategy_entries as unknown as StrategyWithProjects; diff --git a/web/app/[runId]/progress/page.tsx b/web/app/strategy/[id]/progress/page.tsx similarity index 76% rename from web/app/[runId]/progress/page.tsx rename to web/app/strategy/[id]/progress/page.tsx index 2be3fa7..d7110a5 100644 --- a/web/app/[runId]/progress/page.tsx +++ b/web/app/strategy/[id]/progress/page.tsx @@ -2,12 +2,12 @@ import Logs from "@/components/Logs"; export default function ProgressPage(props: { params: { - runId: string + id: string } }) { return (
- +
) } \ No newline at end of file diff --git a/web/components/Logs.tsx b/web/components/Logs.tsx index 80b2c47..6e48727 100644 --- a/web/components/Logs.tsx +++ b/web/components/Logs.tsx @@ -19,7 +19,6 @@ export default async function Logs(props: { runId: string }) { step_name ) `).eq("id", props.runId).single() - if (!run) { throw new Error(`Run with ID '${props.runId}' not found`) diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx index 87ec274..b53f621 100644 --- a/web/components/RealtimeLogs.tsx +++ b/web/components/RealtimeLogs.tsx @@ -6,6 +6,8 @@ import clsx from "clsx"; import { useRouter } from "next/navigation"; import { useState, useEffect } from "react"; import LoadingCircle from "./LoadingCircle"; +import TextField from "./TextField"; +import Button from "./Button"; const UNSTARTED_TEXTS: Record["step_name"], string> = { FETCH_PROJECTS: "Search for relevant projects", @@ -37,6 +39,16 @@ const getLogMessage = (log: Tables<"logs">) => { } } +const checkIfFinished = (logs: Tables<"logs">[]) => { + const sortedLogs = logs.sort((a, b) => { + return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name] + }) + const lastStep = sortedLogs.slice(-1)[0]; + const isFinished = lastStep.status === "COMPLETED" && lastStep.step_name === "SYNTHESIZE_RESULTS" + + return isFinished +} + export default function RealtimeLogs(props: { logs: Tables<"logs">[] run: { @@ -52,6 +64,12 @@ export default function RealtimeLogs(props: { return STEPS_ORDER[a.step_name] - STEPS_ORDER[b.step_name] }) + const isFinished = checkIfFinished(sortedLogsWithSteps) + + const navigateToStrategy = () => { + router.push(`../${props.run.id}`) + } + useEffect(() => { const channel = supabase .channel("logs-added") @@ -63,18 +81,26 @@ export default function RealtimeLogs(props: { schema: "public", filter: `run_id=eq.${props.run.id}`, }, - (payload: { new: Tables<"logs"> }) => { - const updatedLogs = logs.map(log => { - if (log.id === payload.new.id) { - log = payload.new - } - - return log - }) + async () => { + const response = await supabase.from("logs").select(` + id, + run_id, + created_at, + value, + ended_at, + status, + step_name + `).eq("run_id", props.run.id) + const updatedLogs = response.data + + if (!updatedLogs) { + throw new Error(`Logs for Run with ID '${props.run.id}' not found`) + } + setLogs([...updatedLogs]) - if (payload.new.step_name === "SYNTHESIZE_RESULTS" && payload.new.status === "COMPLETED") { - router.push(`/${props.run.id}/strategy`) + if (checkIfFinished(updatedLogs)) { + navigateToStrategy() return; } } @@ -89,10 +115,11 @@ export default function RealtimeLogs(props: { return (
-

Results for:

-
- {props.run.prompt} -
+
@@ -100,7 +127,7 @@ export default function RealtimeLogs(props: {
{ sortedLogsWithSteps.map(log => (
{ log.status === "IN_PROGRESS" ? : <>} @@ -116,6 +143,9 @@ export default function RealtimeLogs(props: { )) }
+
) } \ No newline at end of file From 0844e6512d3e8bc5e020543919b6202cf62d5da4 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Fri, 26 Jan 2024 00:50:16 +0100 Subject: [PATCH 13/17] Changed URL pattern --- web/app/{strategy => r}/[id]/page.tsx | 0 web/app/{strategy => r}/[id]/progress/page.tsx | 0 web/components/RealtimeLogs.tsx | 2 +- 3 files changed, 1 insertion(+), 1 deletion(-) rename web/app/{strategy => r}/[id]/page.tsx (100%) rename web/app/{strategy => r}/[id]/progress/page.tsx (100%) diff --git a/web/app/strategy/[id]/page.tsx b/web/app/r/[id]/page.tsx similarity index 100% rename from web/app/strategy/[id]/page.tsx rename to web/app/r/[id]/page.tsx diff --git a/web/app/strategy/[id]/progress/page.tsx b/web/app/r/[id]/progress/page.tsx similarity index 100% rename from web/app/strategy/[id]/progress/page.tsx rename to web/app/r/[id]/progress/page.tsx diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx index b53f621..e5efbc3 100644 --- a/web/components/RealtimeLogs.tsx +++ b/web/components/RealtimeLogs.tsx @@ -67,7 +67,7 @@ export default function RealtimeLogs(props: { const isFinished = checkIfFinished(sortedLogsWithSteps) const navigateToStrategy = () => { - router.push(`../${props.run.id}`) + router.push(`./`) } useEffect(() => { From e1a8e6a5c10c27d9a239792fbed0e6be32c57266 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Fri, 26 Jan 2024 01:01:42 +0100 Subject: [PATCH 14/17] Moved layout logic to server component --- web/app/r/[id]/progress/page.tsx | 29 ++++++++++++++++++++++++++++- web/components/RealtimeLogs.tsx | 13 ++----------- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/web/app/r/[id]/progress/page.tsx b/web/app/r/[id]/progress/page.tsx index d7110a5..c4e5cbc 100644 --- a/web/app/r/[id]/progress/page.tsx +++ b/web/app/r/[id]/progress/page.tsx @@ -1,4 +1,25 @@ import Logs from "@/components/Logs"; +import TextField from "@/components/TextField"; +import { createSupabaseServerClient } from "@/utils/supabase-server"; + +async function PromptField(props: { runId: string }) { + const supabase = createSupabaseServerClient() + + const { data: run } = await supabase.from('runs').select(` + id, + prompt + `).eq("id", props.runId).single() + + if (!run) { + throw new Error(`Run with ID '${props.runId}' not found`) + } + + return +} export default function ProgressPage(props: { params: { @@ -7,7 +28,13 @@ export default function ProgressPage(props: { }) { return (
- +
+
+ +
+
+ +
) } \ No newline at end of file diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx index e5efbc3..d0a6c4f 100644 --- a/web/components/RealtimeLogs.tsx +++ b/web/components/RealtimeLogs.tsx @@ -6,7 +6,6 @@ import clsx from "clsx"; import { useRouter } from "next/navigation"; import { useState, useEffect } from "react"; import LoadingCircle from "./LoadingCircle"; -import TextField from "./TextField"; import Button from "./Button"; const UNSTARTED_TEXTS: Record["step_name"], string> = { @@ -113,15 +112,7 @@ export default function RealtimeLogs(props: { }, [supabase, props.run.id]); return ( -
-
- -
-
+ <>

Results:

@@ -146,6 +137,6 @@ export default function RealtimeLogs(props: { -
+ ) } \ No newline at end of file From 1b47cad523ce56557a4accbc6a8a034f6e5fca38 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Fri, 26 Jan 2024 01:10:15 +0100 Subject: [PATCH 15/17] Corrected navigation from main screen prompt field --- web/components/Prompt.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/components/Prompt.tsx b/web/components/Prompt.tsx index e5aaf9c..ae74dd3 100644 --- a/web/components/Prompt.tsx +++ b/web/components/Prompt.tsx @@ -28,7 +28,7 @@ export default function Prompt() { setIsWaiting(true); try { const response = await startWorker(prompt); - router.push(`/${response.runId}/progress`) + router.push(`/r/${response.runId}/progress`) } finally { setIsWaiting(false); } From af485ee2aa122190b8a86e820c6a4cca89594d22 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Fri, 26 Jan 2024 01:13:11 +0100 Subject: [PATCH 16/17] Added key to iteration in react --- web/components/RealtimeLogs.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/components/RealtimeLogs.tsx b/web/components/RealtimeLogs.tsx index d0a6c4f..1030ba3 100644 --- a/web/components/RealtimeLogs.tsx +++ b/web/components/RealtimeLogs.tsx @@ -117,7 +117,7 @@ export default function RealtimeLogs(props: {

Results:

{ sortedLogsWithSteps.map(log => ( -
From 67355814676660dd3be227ad5d0fd692665f7f40 Mon Sep 17 00:00:00 2001 From: Nestor Amesty Date: Fri, 26 Jan 2024 12:35:55 +0100 Subject: [PATCH 17/17] Added DB migration script --- web/supabase/migrations/20240126113522_logs.sql | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 web/supabase/migrations/20240126113522_logs.sql diff --git a/web/supabase/migrations/20240126113522_logs.sql b/web/supabase/migrations/20240126113522_logs.sql new file mode 100644 index 0000000..a7cacc6 --- /dev/null +++ b/web/supabase/migrations/20240126113522_logs.sql @@ -0,0 +1,14 @@ + +create type "public"."step_name" as enum ('FETCH_PROJECTS', 'EVALUATE_PROJECTS', 'ANALYZE_FUNDING', 'SYNTHESIZE_RESULTS'); + +create type "public"."step_status" as enum ('NOT_STARTED', 'IN_PROGRESS', 'COMPLETED', 'ERRORED'); + +alter table "public"."logs" drop column "message"; + +alter table "public"."logs" add column "ended_at" timestamp with time zone; + +alter table "public"."logs" add column "status" step_status not null; + +alter table "public"."logs" add column "step_name" step_name not null; + +alter table "public"."logs" add column "value" text; \ No newline at end of file