diff --git a/.gitignore b/.gitignore index 401453a..407a74a 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,62 @@ get_helm.sh .ruff_cache data/minio eval.txt -.DS_Store \ No newline at end of file +.DS_Store + +# Ignore generated or temporary files managed by the Workbench +.project/* +!.project/spec.yaml +!.project/configpacks + +# General ignores + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Temp directories, notebooks created by jupyterlab +.ipynb_checkpoints +.Trash-*/ +.jupyter/ +.local/ + +# Python distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Workbench Project Layout +frontend/demo_outputs/*.mp3 +frontend/demo_outputs/*.json +frontend/output.log diff --git a/.project/configpacks b/.project/configpacks new file mode 100644 index 0000000..f7247e3 --- /dev/null +++ b/.project/configpacks @@ -0,0 +1,10 @@ +*defaults.ContainerUser +*bash.PreBuild +*defaults.EnvVars +*defaults.Readme +*defaults.Entrypoint +*apt.PackageManager +*bash.PreLanguage +*python.PipPackageManager +*bash.PostBuild +*jupyterlab.JupyterLab \ No newline at end of file diff --git a/.project/spec.yaml b/.project/spec.yaml new file mode 100644 index 0000000..f9c0995 --- /dev/null +++ b/.project/spec.yaml @@ -0,0 +1,133 @@ +specVersion: v2 +specMinorVersion: 2 +meta: + name: pdf-to-podcast + image: project-pdf-to-podcast + description: "" + labels: [] + createdOn: "2024-12-06T19:19:11Z" + defaultBranch: main +layout: +- path: docs/ + type: code + storage: git +- path: samples/ + type: code + storage: git +- path: services/ + type: code + storage: git +- path: shared/ + type: code + storage: git +- path: tests/ + type: code + storage: git +- path: launchable/ + type: code + storage: git +- path: frontend/ + type: code + storage: git +environment: + base: + registry: nvcr.io + image: nvidia/ai-workbench/python-basic:1.0.2 + build_timestamp: "20241001182612" + name: Python Basic + supported_architectures: [] + cuda_version: "" + description: A Python Base with Jupyterlab + entrypoint_script: "" + labels: + - ubuntu + - python3 + - jupyterlab + apps: + - name: jupyterlab + type: jupyterlab + class: webapp + start_command: jupyter lab --allow-root --port 8888 --ip 0.0.0.0 --no-browser + --NotebookApp.base_url=\$PROXY_PREFIX --NotebookApp.default_url=/lab --NotebookApp.allow_origin='*' + health_check_command: '[ \$(echo url=\$(jupyter lab list | head -n 2 | tail + -n 1 | cut -f1 -d'' '' | grep -v ''Currently'' | sed "s@/?@/lab?@g") | curl + -o /dev/null -s -w ''%{http_code}'' --config -) == ''200'' ]' + stop_command: jupyter lab stop 8888 + user_msg: "" + logfile_path: "" + timeout_seconds: 60 + icon_url: "" + webapp_options: + autolaunch: true + port: "8888" + proxy: + trim_prefix: false + url_command: jupyter lab list | head -n 2 | tail -n 1 | cut -f1 -d' ' | grep + -v 'Currently' + programming_languages: + - python3 + icon_url: https://workbench.download.nvidia.com/static/img/ai-workbench-icon-rectangle.jpg + image_version: 1.0.5 + os: linux + os_distro: ubuntu + os_distro_release: "22.04" + schema_version: v2 + user_info: + uid: "" + gid: "" + username: "" + package_managers: + - name: apt + binary_path: /usr/bin/apt + installed_packages: + - curl + - git + - git-lfs + - python3 + - gcc + - python3-dev + - python3-pip + - vim + - name: pip + binary_path: /usr/bin/pip + installed_packages: + - jupyterlab==4.2.4 + package_manager_environment: + name: "" + target: "" +execution: + apps: + - name: frontend + type: custom + class: webapp + start_command: PROXY_PREFIX=$PROXY_PREFIX python3 -m frontend + health_check_command: curl -f "http://localhost:7860/" + stop_command: pkill -f "^python3 -m frontend" + user_msg: "" + logfile_path: "" + timeout_seconds: 30 + icon_url: "" + webapp_options: + autolaunch: true + port: "7860" + proxy: + trim_prefix: false + url: http://localhost:7860/ + resources: + gpu: + requested: 0 + sharedMemoryMB: 0 + secrets: + - variable: ELEVENLABS_API_KEY + description: "" + - variable: NVIDIA_API_KEY + description: "" + mounts: + - type: project + target: /project/ + description: Project directory + options: rw + - type: volume + target: /nvwb-shared-volume/ + description: "" + options: volumeName=nvwb-shared-volume diff --git a/README.md b/README.md index 5b786c9..3241d99 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ The blueprint accepts a Target PDF and optionally multiple Context PDFs. The Tar For more information about the PDF, Agent and TTS service flows, please refer to the mermaid [diagram](docs/README.md) +| :exclamation: Important | +| :-----------------------| +| Users running this blueprint with [NVIDIA AI Workbench](https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/workbench/) should skip to the quickstart section [here](https://github.com/nv-edwli/pdf-to-podcast/tree/main/workbench#quickstart)! | + ## Software Components - NVIDIA NIM microservices - Response generation (Inference) diff --git a/docker-compose.yaml b/docker-compose.yaml index eef914e..7fd7f66 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,4 +1,28 @@ services: + local-nim: + image: nvcr.io/nim/meta/llama-3.1-8b-instruct:latest + runtime: nvidia + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [ gpu ] + ports: + - "8000:8000" + volumes: + - type: bind + source: ~/.cache/nim + target: /opt/nim/.cache/ + environment: + - NIM_MODEL_PROFILE=193649a2eb95e821309d6023a2cabb31489d3b690a9973c7ab5d1ff58b0aa7eb + - NGC_API_KEY=${NVIDIA_API_KEY:?Error NVIDIA_API_KEY not set} + networks: + - app-network + profiles: + - local + redis: image: redis:latest ports: @@ -16,7 +40,7 @@ services: - MINIO_ROOT_USER=minioadmin - MINIO_ROOT_PASSWORD=minioadmin volumes: - - ./data/minio:/data + - minio_data:/data command: minio server /data --console-address ":9001" networks: - app-network @@ -150,6 +174,7 @@ services: volumes: redis_data: pdf_temp: + minio_data: networks: app-network: diff --git a/docs/.gitkeep b/docs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/frontend/.gitkeep b/frontend/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/frontend/__init__.py b/frontend/__init__.py new file mode 100644 index 0000000..d50bcc4 --- /dev/null +++ b/frontend/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/frontend/__main__.py b/frontend/__main__.py new file mode 100644 index 0000000..b8d8eb7 --- /dev/null +++ b/frontend/__main__.py @@ -0,0 +1,273 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gradio as gr +import requests +import tempfile +import os +import base64 +import json +import sys +import ast +import re + +import uuid +from frontend.utils import email_demo, logger + +BP_INFO_MARKDOWN=""" +### Key Features + +PDF to Markdown Service + + * Extracts content from PDFs and converts it into markdown format for further processing. + +Monologue or Dialogue Creation Service + + * AI processes markdown content, enriching or structuring it to create natural and engaging audio content. + +Text-to-Speech (TTS) Service + + * Converts the processed content into high-quality speech. + +""" + +CONFIG_INSTRUCTIONS_MARKDOWN=""" +Use this editor to configure your long-reasoning agent. + +**Note:** The default configuration is for Build endpoints on the NVIDIA API Catalog. To use a local agent, ensure the compose services are running with the ``local`` profile. + +**Example**: Using a _locally_ running ``llama-3.1-8b-instruct`` NVIDIA NIM + +``` +{ + "reasoning": { + "name": "meta/llama-3.1-8b-instruct", + "api_base": "http://pdf-to-podcast-local-nim-1:8000/v1" + }, + ... +} +``` + +""" + +css = """ +#editor-title { + margin: auto; + width: 100%; + text-align: center; +} +""" + +js_func = """ +function refresh() { + const url = new URL(window.location); + if (url.searchParams.get('__theme') !== 'dark') { + url.searchParams.set('__theme', 'dark'); + window.location.href = url.href; + } +} +""" + +_CONFIG_CHANGES_JS = """ +async() => { + title = document.querySelector("div#config-toolbar p"); + if (! title.innerHTML.endsWith("๐ŸŸ ")) { title.innerHTML = title.innerHTML.slice(0,-2) + "๐ŸŸ "; }; +} +""" +_SAVE_CHANGES_JS = """ +async() => { + title = document.querySelector("div#config-toolbar p"); + if (! title.innerHTML.endsWith("๐ŸŸข")) { title.innerHTML = title.innerHTML.slice(0,-2) + "๐ŸŸข"; }; +} +""" +_SAVE_IMG = "https://media.githubusercontent.com/media/NVIDIA/nim-anywhere/main/code/frontend/_static/images/floppy.png" +_UNDO_IMG = "https://media.githubusercontent.com/media/NVIDIA/nim-anywhere/main/code/frontend/_static/images/undo.png" +_HISTORY_IMG = "https://media.githubusercontent.com/media/NVIDIA/nim-anywhere/main/code/frontend/_static/images/history.png" +_PSEUDO_FILE_NAME = "models.json ๐ŸŸข" +with open("/project/models.json", "r", encoding="UTF-8") as config_file: + _STARTING_CONFIG = config_file.read() + +sys.stdout = logger.Logger("/project/frontend/output.log") + +# Gradio Interface +with gr.Blocks(css=css, js=js_func) as demo: + gr.Markdown("# NVIDIA AI Blueprint: PDF-to-Podcast") + + with gr.Row(): + with gr.Column(scale=1): + + with gr.Tab("Full End to End Flow"): + gr.Markdown("### Upload at least one PDF file for a file to target or as context. ") + with gr.Row(): + target_files = gr.File(label="Upload target PDF", file_types=[".pdf"]) + context_files = gr.File(label="Upload context PDF", file_types=[".pdf"], file_count="multiple") + with gr.Row(): + settings = gr.CheckboxGroup( + ["Monologue Only"], label="Additional Settings", info="Customize your podcast here" + ) + with gr.Accordion("Optional: Email Details", open=False): + gr.Markdown("Enter a recipient email here to receive your generated podcast in your inbox! \n\n**Note**: Ensure `SENDER_EMAIL` and `SENDER_EMAIL_PASSWORD` are configured in AI Workbench") + recipient_email = gr.Textbox(label="Recipient email", placeholder="Enter email here") + + generate_button = gr.Button("Generate Podcast") + + with gr.Tab("Agent Configurations"): + gr.Markdown(CONFIG_INSTRUCTIONS_MARKDOWN) + with gr.Row(elem_id="config-row"): + with gr.Column(): + with gr.Group(elem_id="config-wrapper"): + with gr.Row(elem_id="config-toolbar", elem_classes=["toolbar"]): + file_title = gr.Markdown(_PSEUDO_FILE_NAME, elem_id="editor-title") + save_btn = gr.Button("", icon=_SAVE_IMG, elem_classes=["toolbar"]) + undo_btn = gr.Button("", icon=_UNDO_IMG, elem_classes=["toolbar"]) + reset_btn = gr.Button("", icon=_HISTORY_IMG, elem_classes=["toolbar"]) + with gr.Row(elem_id="config-row-box"): + editor = gr.Code( + elem_id="config-editor", + interactive=True, + language="json", + show_label=False, + container=False, + ) + + with gr.Tab("Architecture Diagram"): + gr.Markdown(BP_INFO_MARKDOWN) + gr.Image(value="frontend/static/diagram.png") + + with gr.Column(scale=1): + gr.Markdown("
") + output = gr.Textbox(label="Outputs", placeholder="Outputs will show here when executing", max_lines=20, lines=20) + output_file = gr.File(visible=False, interactive=False) + transcript_file = gr.File(visible=False, interactive=False) + history_file = gr.File(visible=False, interactive=False) + + demo.load(logger.read_logs, None, output, every=1) + + def read_chain_config() -> str: + """Read the chain config file.""" + with open("/project/models.json", "r", encoding="UTF-8") as cf: + return cf.read() + + demo.load(read_chain_config, outputs=editor) + + undo_btn.click(read_chain_config, outputs=editor) + undo_btn.click(None, js=_SAVE_CHANGES_JS) + + @reset_btn.click(outputs=editor) + def reset_demo() -> str: + """Reset the configuration to the starting config.""" + return _STARTING_CONFIG + + @save_btn.click(inputs=editor) + def save_chain_config(config_txt: str) -> None: + """Save the user's config file.""" + # validate json + try: + config_data = json.loads(config_txt) + except Exception as err: + raise SyntaxError(f"Error validating JSON syntax:\n{err}") from err + + # save configuration + with open("/project/models.json", "w", encoding="UTF-8") as cf: + cf.write(config_txt) + + save_btn.click(None, js=_SAVE_CHANGES_JS) + + # %% editor actions + editor.input(None, js=_CONFIG_CHANGES_JS) + + def validate_sender(sender): + if sender is None: + return False + regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + return bool(re.match(regex, sender)) + + def get_transcript(filename, job_id): + service = os.environ["API_SERVICE_URL"] + url = f"{service}/saved_podcast/{job_id}/transcript" + params = {"userId": "test-userid"} + filepath = "/project/frontend/demo_outputs/transcript_" + filename + ".json" + + response = requests.get(url, params=params) + if response.status_code == 200: + json_data = response.json() + with open(filepath, "w") as file: + json.dump(json_data, file) + print(f"JSON data saved to {filepath}") + return filepath + else: + print(f"Error retrieving transcript: {response.status_code}") + return filepath + + def get_history(filename, job_id): + service = os.environ["API_SERVICE_URL"] + url = f"{service}/saved_podcast/{job_id}/history" + params = {"userId": "test-userid"} + filepath = "/project/frontend/demo_outputs/generation_history_" + filename + ".json" + response = requests.get(url, params=params) + + if response.status_code == 200: + json_data = response.json() + with open(filepath, "w") as file: + json.dump(json_data, file) + print(f"JSON data saved to {filepath}") + return filepath + else: + print(f"Error retrieving generation_history: {response.status_code}") + return filepath + + def generate_podcast(target, context, recipient, settings): + if target is None or len(target) == 0: + gr.Warning("Target PDF upload not detected. Please upload a target PDF file and try again. ") + return gr.update(visible=False) + + sender_email = os.environ["SENDER_EMAIL"] if "SENDER_EMAIL" in os.environ else None + + if isinstance(target, str): + target = [target] + if isinstance(context, str): + context = [context] + + base_url = os.environ["API_SERVICE_URL"] + monologue = True if "Monologue Only" in settings else False + vdb = False # True if "Vector Database" in settings else False + filename = str(uuid.uuid4()) + sender_validation = validate_sender(sender_email) + if not sender_validation and len(recipient) > 0: + gr.Warning("SENDER_EMAIL not detected or malformed. Please fix or remove recipient email and try again. You may need to restart the container for Environment Variable changes to take effect. ") + return gr.update(visible=False) + elif sender_validation and len(recipient) > 0 and "SENDER_EMAIL_PASSWORD" not in os.environ: + gr.Warning("SENDER_EMAIL_PASSWORD not detected. Please fix or remove recipient email and try again. You may need to restart the container for Environment Variable changes to take effect. ") + return gr.update(visible=False) + email = [recipient] if (sender_validation and len(recipient) > 0 and "SENDER_EMAIL_PASSWORD" in os.environ) else [filename + "@"] # delimiter + + # Generate podcast + job_id = email_demo.test_api(base_url, target, context, email, monologue, vdb) + + # Send file via email + if sender_validation and len(recipient) > 0 and "SENDER_EMAIL_PASSWORD" in os.environ: + email_demo.send_file_via_email("/project/frontend/demo_outputs/" + recipient.split('@')[0] + "-output.mp3", sender_email, recipient) + return gr.update(value="/project/frontend/demo_outputs/" + recipient.split('@')[0] + "-output.mp3", label="podcast audio", visible=True), gr.update(value=get_transcript(recipient.split('@')[0], job_id), label="podcast transcript", visible=True), gr.update(value=get_history(recipient.split('@')[0], job_id), label="generation history", visible=True) + + return gr.update(value="/project/frontend/demo_outputs/" + filename + "-output.mp3", label="podcast audio", visible=True), gr.update(value=get_transcript(filename, job_id), label="podcast transcript", visible=True), gr.update(value=get_history(filename, job_id), label="generation history", visible=True) + + generate_button.click(generate_podcast, [target_files, + context_files, + recipient_email, + settings], [output_file, transcript_file, history_file]) + +# Launch Gradio app +if __name__ == "__main__": + demo.launch(server_name="0.0.0.0", root_path=os.environ.get("PROXY_PREFIX")) diff --git a/frontend/demo_outputs/.gitkeep b/frontend/demo_outputs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/frontend/output.log b/frontend/output.log new file mode 100644 index 0000000..04f3df5 --- /dev/null +++ b/frontend/output.log @@ -0,0 +1,3 @@ +Running on local URL: http://0.0.0.0:7860 + +To create a public link, set `share=True` in `launch()`. diff --git a/frontend/shared/setup.py b/frontend/shared/setup.py new file mode 100644 index 0000000..81b05e6 --- /dev/null +++ b/frontend/shared/setup.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup, find_packages + +setup( + name="shared", + version="0.1", + packages=find_packages(), + install_requires=[ + "redis", + "pydantic", + "httpx", + "requests", + "langchain-nvidia-ai-endpoints", + ], +) diff --git a/frontend/shared/shared/__init__.py b/frontend/shared/shared/__init__.py new file mode 100644 index 0000000..d50bcc4 --- /dev/null +++ b/frontend/shared/shared/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/frontend/shared/shared/api_types.py b/frontend/shared/shared/api_types.py new file mode 100644 index 0000000..f0471c2 --- /dev/null +++ b/frontend/shared/shared/api_types.py @@ -0,0 +1,120 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pydantic import BaseModel, Field, model_validator +from typing import Optional, Dict, List +from .pdf_types import PDFMetadata +from enum import Enum + + +class JobStatus(str, Enum): + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + + +class ServiceType(str, Enum): + PDF = "pdf" + AGENT = "agent" + TTS = "tts" + + +class StatusUpdate(BaseModel): + job_id: str + status: JobStatus + message: Optional[str] = None + service: Optional[ServiceType] = None + timestamp: Optional[float] = None + data: Optional[dict] = None + + +class StatusResponse(BaseModel): + status: str + result: Optional[str] = None + error: Optional[str] = None + message: Optional[str] = None + + +class TranscriptionParams(BaseModel): + userId: str = Field(..., description="KAS User ID") + name: str = Field(..., description="Name of the podcast") + duration: int = Field(..., description="Duration in minutes") + monologue: bool = Field( + False, description="If True, creates a single-speaker podcast" + ) + speaker_1_name: str = Field( + ..., description="Name of the speaker (or first speaker if not monologue)" + ) + speaker_2_name: Optional[str] = Field( + None, description="Name of the second speaker (not required for monologue)" + ) + voice_mapping: Dict[str, str] = Field( + ..., + description="Mapping of speaker IDs to voice IDs. For monologue, only speaker-1 is required", + example={ + "speaker-1": "iP95p4xoKVk53GoZ742B", + "speaker-2": "9BWtsMINqrJLrRacOk9x", + }, + ) + guide: Optional[str] = Field( + None, description="Optional guidance for the transcription focus and structure" + ) + vdb_task: bool = Field( + False, + description="If True, creates a VDB task when running NV-Ingest allowing for retrieval abilities", + ) + + @model_validator(mode="after") + def validate_monologue_settings(self) -> "TranscriptionParams": + if self.monologue: + # Check speaker_2_name is not provided + if self.speaker_2_name is not None: + raise ValueError( + "speaker_2_name should not be provided for monologue podcasts" + ) + + # Check voice_mapping only contains speaker-1 + if "speaker-2" in self.voice_mapping: + raise ValueError( + "voice_mapping should only contain speaker-1 for monologue podcasts" + ) + + # Check that speaker-1 is present in voice_mapping + if "speaker-1" not in self.voice_mapping: + raise ValueError("voice_mapping must contain speaker-1") + else: + # For dialogues, ensure both speakers are present + if not self.speaker_2_name: + raise ValueError("speaker_2_name is required for dialogue podcasts") + + required_speakers = {"speaker-1", "speaker-2"} + if not all(speaker in self.voice_mapping for speaker in required_speakers): + raise ValueError( + "voice_mapping must contain both speaker-1 and speaker-2 for dialogue podcasts" + ) + + return self + + +class TranscriptionRequest(TranscriptionParams): + pdf_metadata: List[PDFMetadata] + job_id: str + + +class RAGRequest(BaseModel): + query: str = Field(..., description="The search query to process") + k: int = Field(..., description="Number of results to retrieve", ge=1) + job_id: str = Field(..., description="The unique job identifier") diff --git a/frontend/shared/shared/connection.py b/frontend/shared/shared/connection.py new file mode 100644 index 0000000..9168b52 --- /dev/null +++ b/frontend/shared/shared/connection.py @@ -0,0 +1,156 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from fastapi import WebSocket, WebSocketDisconnect +from typing import Dict, Set +import redis +import ujson as json +import logging +import time +import asyncio +from collections import defaultdict +from threading import Thread +import queue + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ConnectionManager: + def __init__(self, redis_client: redis.Redis): + self.active_connections: Dict[str, Set[WebSocket]] = defaultdict(set) + self.pubsub = None + self.message_queue = queue.Queue() + self.redis_thread = None + self.should_stop = False + self.redis_client = redis_client + + async def connect(self, websocket: WebSocket, job_id: str): + await websocket.accept() + self.active_connections[job_id].add(websocket) + logger.info( + f"New WebSocket connection for job {job_id}. Total connections: {len(self.active_connections[job_id])}" + ) + + # Start Redis listener if not already running + if self.redis_thread is None: + self.redis_thread = Thread(target=self._redis_listener) + self.redis_thread.daemon = True + self.redis_thread.start() + # Start the async message processor + asyncio.create_task(self._process_messages()) + + def disconnect(self, websocket: WebSocket, job_id: str): + if job_id in self.active_connections: + self.active_connections[job_id].remove(websocket) + if not self.active_connections[job_id]: + del self.active_connections[job_id] + logger.info( + f"WebSocket disconnected for job {job_id}. Remaining connections: {len(self.active_connections[job_id]) if job_id in self.active_connections else 0}" + ) + + def _redis_listener(self): + """Redis subscription running in a separate thread""" + try: + self.pubsub = self.redis_client.pubsub(ignore_subscribe_messages=True) + self.pubsub.subscribe("status_updates:all") + logger.info("Successfully subscribed to Redis status_updates:all channel") + + while not self.should_stop: + message = self.pubsub.get_message() + if message and message["type"] == "message" and "data" in message: + data = message["data"].decode("utf-8") + try: + job_update = json.loads(data) + logger.info( + f"Received message for job {job_update.get('job_id')}" + ) + except json.JSONDecodeError: + logger.error("Invalid JSON in Redis message") + # Put message in queue for processing regardless of logging logic error + finally: + self.message_queue.put(data) + time.sleep(0.01) # Prevent tight loop + + except Exception as e: + logger.error(f"Redis subscription error: {e}") + finally: + if self.pubsub: + self.pubsub.unsubscribe() + self.pubsub.close() + + async def _process_messages(self): + """Async task to process messages from the queue and broadcast them""" + while True: + try: + # Check queue in a non-blocking way + while not self.message_queue.empty(): + message: str = self.message_queue.get_nowait() + try: + update = json.loads(message) + job_id = update.get("job_id") + logger.info(f"Processing message for job {job_id}") + + if job_id and job_id in self.active_connections: + logger.info( + f"Broadcasting update for job {job_id} to {len(self.active_connections[job_id])} connections" + ) + await self.broadcast_to_job( + job_id, + { + "service": update.get("service"), + "status": update.get("status"), + "message": update.get("message", ""), + }, + ) + logger.info( + f"Broadcasted update for job {job_id}: {update.get('service')} - {update.get('status')}" + ) + except json.JSONDecodeError: + logger.error(f"Invalid JSON in Redis message: {message}") + except Exception as e: + logger.error(f"Error processing Redis message: {e}") + + # Small delay before next check + await asyncio.sleep(0.01) + + except Exception as e: + logger.error(f"Message processing error: {e}") + await asyncio.sleep(1) + + async def broadcast_to_job(self, job_id: str, message: dict): + """Send message to all WebSocket connections for a job""" + if job_id in self.active_connections: + disconnected = set() + for connection in self.active_connections[job_id]: + try: + await connection.send_json(message) + except WebSocketDisconnect: + disconnected.add(connection) + except Exception as e: + logger.error(f"Error sending message to WebSocket: {e}") + disconnected.add(connection) + + # Clean up disconnected clients + for connection in disconnected: + self.disconnect(connection, job_id) + + def cleanup(self): + """Cleanup resources""" + self.should_stop = True + if self.redis_thread: + self.redis_thread.join(timeout=1.0) + if self.pubsub: + self.pubsub.close() diff --git a/frontend/shared/shared/job.py b/frontend/shared/shared/job.py new file mode 100644 index 0000000..f4a3ac1 --- /dev/null +++ b/frontend/shared/shared/job.py @@ -0,0 +1,126 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from shared.api_types import ServiceType +from shared.otel import OpenTelemetryInstrumentation +import redis +import time +import ujson as json +import threading + + +class JobStatusManager: + def __init__( + self, + service_type: ServiceType, + telemetry: OpenTelemetryInstrumentation, + redis_url="redis://redis:6379", + ): + self.telemetry = telemetry + self.redis = redis.Redis.from_url(redis_url, decode_responses=False) + self.service_type = service_type + self._lock = threading.Lock() + + def create_job(self, job_id: str): + with self.telemetry.tracer.start_as_current_span("job.create_job") as span: + span.set_attribute("job_id", job_id) + update = { + "job_id": job_id, + "status": "pending", + "message": "Job created", + "service": self.service_type, + "timestamp": time.time(), + } + # Encode the update dict as JSON bytes + hset_key = f"status:{job_id}:{str(self.service_type)}" + span.set_attribute("hset_key", hset_key) + self.redis.hset( + hset_key, + mapping={k: str(v).encode() for k, v in update.items()}, + ) + self.redis.publish("status_updates:all", json.dumps(update).encode()) + + def update_status(self, job_id: str, status: str, message: str): + with self.telemetry.tracer.start_as_current_span("job.update_status") as span: + span.set_attribute("job_id", job_id) + update = { + "job_id": job_id, + "status": status, + "message": message, + "service": self.service_type, + "timestamp": time.time(), + } + # Encode the update dict as JSON bytes + hset_key = f"status:{job_id}:{str(self.service_type)}" + span.set_attribute("hset_key", hset_key) + self.redis.hset( + hset_key, + mapping={k: str(v).encode() for k, v in update.items()}, + ) + self.redis.publish("status_updates:all", json.dumps(update).encode()) + + def set_result(self, job_id: str, result: bytes): + with self.telemetry.tracer.start_as_current_span("job.set_result") as span: + span.set_attribute("job_id", job_id) + set_key = f"result:{job_id}:{str(self.service_type)}" + span.set_attribute("set_key", set_key) + self.redis.set(set_key, result) + + def set_result_with_expiration(self, job_id: str, result: bytes, ex: int): + with self.telemetry.tracer.start_as_current_span( + "job.set_result_with_expiration" + ) as span: + span.set_attribute("job_id", job_id) + set_key = f"result:{job_id}:{str(self.service_type)}" + span.set_attribute("set_key", set_key) + self.redis.set(set_key, result, ex=ex) + + def get_result(self, job_id: str): + with self.telemetry.tracer.start_as_current_span("job.get_result") as span: + span.set_attribute("job_id", job_id) + get_key = f"result:{job_id}:{str(self.service_type)}" + span.set_attribute("get_key", get_key) + result = self.redis.get(get_key) + return result if result else None + + def get_status(self, job_id: str): + with self.telemetry.tracer.start_as_current_span("job.get_status") as span: + span.set_attribute("job_id", job_id) + # Get raw bytes and decode manually + hget_key = f"status:{job_id}:{str(self.service_type)}" + span.set_attribute("hget_key", hget_key) + status = self.redis.hgetall(hget_key) + if not status: + raise ValueError("Job not found") + # Decode bytes to strings for each field + return {k.decode(): v.decode() for k, v in status.items()} + + def cleanup_old_jobs(self, max_age=3600): + current_time = time.time() + removed = 0 + pattern = f"status:*:{str(self.service_type)}" + for key in self.redis.scan_iter(match=pattern): + status = self.redis.hgetall(key) + try: + timestamp = float(status[b"timestamp"].decode()) + if timestamp < current_time - max_age: + self.redis.delete(key) + job_id = key.split(b":")[1].decode() + self.redis.delete(f"result:{job_id}:{self.service_type}") + removed += 1 + except (KeyError, ValueError): + # Handle malformed status entries + continue + return removed diff --git a/frontend/shared/shared/llmmanager.py b/frontend/shared/shared/llmmanager.py new file mode 100644 index 0000000..24c5ba9 --- /dev/null +++ b/frontend/shared/shared/llmmanager.py @@ -0,0 +1,276 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain_nvidia_ai_endpoints import ChatNVIDIA +from typing import List, Dict, Any, Optional, Union +import logging +import ujson as json +from shared.otel import OpenTelemetryInstrumentation +from opentelemetry.trace.status import StatusCode +from pathlib import Path +from dataclasses import dataclass +from langchain_core.messages import AIMessage + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class ModelConfig: + name: str + api_base: str + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ModelConfig": + return cls( + name=data["name"], + api_base=data["api_base"], + ) + + +class LLMManager: + """ + A lightweight and user friendly wrapper over Langchain's ChatNVIDIA class. We use this class + to abstract away all Langchain functionalities including models, async/sync queries, + structured outputs, types, streaming and more. It also comes with OTEL telemetry out of the box + for all queries. It is specifically tailored for singular invocations. + + Configs can be overridden by providing a custom config file. Currently the defaults are + hardcoded to build.nvidia.com endpoints. + + Usage: + >>> llm_manager = LLMManager(api_key, telemetry) + >>> llm_manager.query_sync("reasoning", [{"role": "user", "content": "Hello, world!"}], "test") + """ + + DEFAULT_CONFIGS = { + "reasoning": { + "name": "meta/llama-3.1-405b-instruct", + "api_base": "https://integrate.api.nvidia.com/v1", + }, + "iteration": { + "name": "meta/llama-3.1-405b-instruct", + "api_base": "https://integrate.api.nvidia.com/v1", + }, + "json": { + "name": "meta/llama-3.1-70b-instruct", + "api_base": "https://integrate.api.nvidia.com/v1", + }, + } + + def __init__( + self, + api_key: str, + telemetry: OpenTelemetryInstrumentation, + config_path: Optional[str] = None, + ): + """ + Initialize LLMManager with telemetry + requires: OpenTelemetryInstrumentation instance for tracing + """ + try: + self.api_key = api_key + self.telemetry = telemetry + self._llm_cache: Dict[str, ChatNVIDIA] = {} + self.model_configs = self._load_configurations(config_path) + logger.info("Successfully initialized LLMManager") + except Exception as e: + logger.error(f"Failed to initialize LLMManager: {e}") + raise + + def _load_configurations( + self, config_path: Optional[str] + ) -> Dict[str, ModelConfig]: + """Load model configurations from JSON file if provided, otherwise use defaults""" + configs = self.DEFAULT_CONFIGS.copy() + if config_path: + try: + config_path = Path(config_path) + if config_path.exists(): + with config_path.open() as f: + custom_configs = json.load(f) + configs.update(custom_configs) + else: + logger.warning( + f"Config file {config_path} not found, using default configurations" + ) + except Exception as e: + logger.error(f"Error loading config file: {e}") + logger.warning("Using default configurations") + return {key: ModelConfig.from_dict(config) for key, config in configs.items()} + + def get_llm(self, model_key: str) -> ChatNVIDIA: + """Get or create a ChatNVIDIA model for the specified model key""" + if model_key not in self.model_configs: + raise ValueError(f"Unknown model key: {model_key}") + if model_key not in self._llm_cache: + config = self.model_configs[model_key] + self._llm_cache[model_key] = ChatNVIDIA( + model=config.name, + base_url=config.api_base, + nvidia_api_key=self.api_key, + max_tokens=None, + ) + return self._llm_cache[model_key] + + def query_sync( + self, + model_key: str, + messages: List[Dict[str, str]], + query_name: str, + json_schema: Optional[Dict] = None, + retries: int = 5, + ) -> Union[AIMessage, Dict[str, Any]]: + """Send a synchronous query to the specified model""" + with self.telemetry.tracer.start_as_current_span( + f"agent.query.{query_name}" + ) as span: + span.set_attribute("model_key", model_key) + span.set_attribute("retries", retries) + span.set_attribute("async", False) + + try: + llm = self.get_llm(model_key) + if json_schema: + llm = llm.with_structured_output(json_schema) + llm = llm.with_retry( + stop_after_attempt=retries, wait_exponential_jitter=True + ) + resp = llm.invoke(messages) + return resp + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error(f"Query failed: {e}") + raise Exception( + f"Failed to get response after {retries} attempts" + ) from e + + async def query_async( + self, + model_key: str, + messages: List[Dict[str, str]], + query_name: str, + json_schema: Optional[Dict] = None, + retries: int = 5, + ) -> Union[AIMessage, Dict[str, Any]]: + """Send an asynchronous query to the specified model""" + with self.telemetry.tracer.start_as_current_span( + f"agent.query.{query_name}" + ) as span: + span.set_attribute("model_key", model_key) + span.set_attribute("retries", retries) + span.set_attribute("async", True) + + try: + llm = self.get_llm(model_key) + if json_schema: + llm = llm.with_structured_output(json_schema) + llm = llm.with_retry( + stop_after_attempt=retries, wait_exponential_jitter=True + ) + resp = await llm.ainvoke(messages) + return resp + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error(f"Query failed: {e}") + raise Exception( + f"Failed to get response after {retries} attempts" + ) from e + + def stream_sync( + self, + model_key: str, + messages: List[Dict[str, str]], + query_name: str, + json_schema: Optional[Dict] = None, + retries: int = 5, + ) -> Union[str, Dict[str, Any]]: + """Send a synchronous streaming query to the specified model""" + with self.telemetry.tracer.start_as_current_span( + f"agent.stream.{query_name}" + ) as span: + span.set_attribute("model_key", model_key) + span.set_attribute("retries", retries) + span.set_attribute("async", False) + + try: + llm = self.get_llm(model_key) + if json_schema: + llm = llm.with_structured_output(json_schema) + llm = llm.with_retry( + stop_after_attempt=retries, wait_exponential_jitter=True + ) + + last_chunk = None + for chunk in llm.stream(messages): + # AIMessage returns content and JSON returns the dict itself + if hasattr(chunk, "content"): + last_chunk = chunk.content + else: + last_chunk = chunk + + return last_chunk + + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error(f"Streaming query failed: {e}") + raise Exception( + f"Failed to get streaming response after {retries} attempts" + ) from e + + async def stream_async( + self, + model_key: str, + messages: List[Dict[str, str]], + query_name: str, + json_schema: Optional[Dict] = None, + retries: int = 5, + ) -> Union[str, Dict[str, Any]]: + """Send an asynchronous streaming query to the specified model""" + with self.telemetry.tracer.start_as_current_span( + f"agent.stream.{query_name}" + ) as span: + span.set_attribute("model_key", model_key) + span.set_attribute("retries", retries) + span.set_attribute("async", True) + + try: + llm = self.get_llm(model_key) + if json_schema: + llm = llm.with_structured_output(json_schema) + llm = llm.with_retry( + stop_after_attempt=retries, wait_exponential_jitter=True + ) + + last_chunk = None + async for chunk in llm.astream(messages): + # AIMessage returns content and JSON returns the dict itself + if hasattr(chunk, "content"): + last_chunk = chunk.content + else: + last_chunk = chunk + + return last_chunk + + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error(f"Async streaming query failed: {e}") + raise Exception( + f"Failed to get streaming response after {retries} attempts" + ) from e diff --git a/frontend/shared/shared/otel.py b/frontend/shared/shared/otel.py new file mode 100644 index 0000000..c92b496 --- /dev/null +++ b/frontend/shared/shared/otel.py @@ -0,0 +1,127 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import Optional +import logging +import os + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class OpenTelemetryConfig: + """Configuration for OpenTelemetry setup.""" + + service_name: str + otlp_endpoint: str = "http://jaeger:4317" + enable_redis: bool = True + enable_requests: bool = True + enable_httpx: bool = True + enable_urllib3: bool = True + + +class OpenTelemetryInstrumentation: + """ + Lightweight OTEL wrapper + + Example usage: + telemetry = OpenTelemetryInstrumentation() + app = FastAPI() + telemetry.initialize(app, "api-service") + + # In code + with telemetry.tracer.start_as_current_span("operation_name") as span: + span.set_attribute("key", "value") + """ + + def __init__(self): + self._tracer: Optional[trace.Tracer] = None + self._config: Optional[OpenTelemetryConfig] = None + + @property + def tracer(self) -> trace.Tracer: + """Get the configured tracer instance.""" + if not self._tracer: + raise RuntimeError( + "OpenTelemetry has not been initialized. Call initialize() first." + ) + return self._tracer + + def initialize( + self, config: OpenTelemetryConfig, app=None + ) -> "OpenTelemetryInstrumentation": + """ + Initialize OpenTelemetry instrumentation with the given configuration. + + Args: + app: The FastAPI application instance + config: OpenTelemetryConfig instance containing configuration options + + Returns: + self for method chaining + """ + self._config = config + logger.info(f"Setting up tracing for service: {self._config.service_name}") + logger.info(f"Container ID: {os.uname().nodename}") + self._setup_tracing() + self._instrument_app(app) + return self + + def _setup_tracing(self) -> None: + """Set up the OpenTelemetry tracer provider and processors.""" + resource = Resource.create({"service.name": self._config.service_name}) + + provider = TracerProvider(resource=resource) + processor = BatchSpanProcessor( + OTLPSpanExporter(endpoint=self._config.otlp_endpoint) + ) + + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + self._tracer = trace.get_tracer(self._config.service_name) + + def _instrument_app(self, app=None) -> None: + """Instrument the FastAPI application and optional components.""" + # Instrument FastAPI + if app: + FastAPIInstrumentor.instrument_app(app) + + # Instrument Redis if enabled + if self._config.enable_redis: + RedisInstrumentor().instrument() + + # Instrument requests library if enabled + if self._config.enable_requests: + RequestsInstrumentor().instrument() + + if self._config.enable_httpx: + HTTPXClientInstrumentor().instrument() + + if self._config.enable_urllib3: + URLLib3Instrumentor().instrument() diff --git a/frontend/shared/shared/pdf_types.py b/frontend/shared/shared/pdf_types.py new file mode 100644 index 0000000..6b5b621 --- /dev/null +++ b/frontend/shared/shared/pdf_types.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pydantic import BaseModel, Field +from typing import Optional, Union, Literal +from datetime import datetime +from enum import Enum + + +class ConversionStatus(str, Enum): + SUCCESS = "success" + FAILED = "failed" + + +class PDFConversionResult(BaseModel): + filename: str + content: str = "" + status: ConversionStatus + error: Optional[str] = None + + +class PDFMetadata(BaseModel): + filename: str + markdown: str = "" + summary: str = "" + status: ConversionStatus + type: Union[Literal["target"], Literal["context"]] + error: Optional[str] = None + created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/frontend/shared/shared/podcast_types.py b/frontend/shared/shared/podcast_types.py new file mode 100644 index 0000000..95b4610 --- /dev/null +++ b/frontend/shared/shared/podcast_types.py @@ -0,0 +1,60 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pydantic import BaseModel +from typing import Optional, Dict, Literal, List + + +class SavedPodcast(BaseModel): + job_id: str + filename: str + created_at: str + size: int + transcription_params: Optional[Dict] = {} + + +class SavedPodcastWithAudio(SavedPodcast): + audio_data: str + + +class DialogueEntry(BaseModel): + text: str + speaker: Literal["speaker-1", "speaker-2"] + + +class Conversation(BaseModel): + scratchpad: str + dialogue: List[DialogueEntry] + + +class SegmentPoint(BaseModel): + description: str + + +class SegmentTopic(BaseModel): + title: str + points: List[SegmentPoint] + + +class PodcastSegment(BaseModel): + section: str + topics: List[SegmentTopic] + duration: int + references: List[str] + + +class PodcastOutline(BaseModel): + title: str + segments: List[PodcastSegment] diff --git a/frontend/shared/shared/prompt_tracker.py b/frontend/shared/shared/prompt_tracker.py new file mode 100644 index 0000000..5a8f655 --- /dev/null +++ b/frontend/shared/shared/prompt_tracker.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict +import time +import logging +from .storage import StorageManager +from .prompt_types import ProcessingStep, PromptTracker as PromptTrackerModel + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class PromptTracker: + """Track prompts and responses and save them to storage""" + + def __init__(self, job_id: str, user_id: str, storage_manager: StorageManager): + self.job_id = job_id + self.user_id = user_id + self.steps: Dict[str, ProcessingStep] = {} + self.storage_manager = storage_manager + + def track(self, step_name: str, prompt: str, model: str, response: str = None): + """Track a processing step""" + self.steps[step_name] = ProcessingStep( + step_name=step_name, + prompt=prompt, + response=response if response else "", + model=model, + timestamp=time.time(), + ) + if response: + self._save() + logger.info(f"Tracked step {step_name} for {self.job_id}") + + def update_result(self, step_name: str, response: str): + """Update the response for an existing step""" + if step_name in self.steps: + self.steps[step_name].response = response + self._save() + logger.info(f"Updated response for step {step_name}") + else: + logger.warning(f"Step {step_name} not found in prompt tracker") + + def _save(self): + """Save the current state to storage""" + tracker = PromptTrackerModel(steps=list(self.steps.values())) + self.storage_manager.store_file( + self.user_id, + self.job_id, + tracker.model_dump_json().encode(), + f"{self.job_id}_prompt_tracker.json", + "application/json", + ) + logger.info( + f"Stored prompt tracker for {self.job_id} in minio. Length: {len(self.steps)}" + ) diff --git a/frontend/shared/shared/prompt_types.py b/frontend/shared/shared/prompt_types.py new file mode 100644 index 0000000..1b41074 --- /dev/null +++ b/frontend/shared/shared/prompt_types.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pydantic import BaseModel +from typing import List + + +class ProcessingStep(BaseModel): + step_name: str + prompt: str + response: str + model: str + timestamp: float + + +class PromptTracker(BaseModel): + steps: List[ProcessingStep] diff --git a/frontend/shared/shared/storage.py b/frontend/shared/shared/storage.py new file mode 100644 index 0000000..c480a84 --- /dev/null +++ b/frontend/shared/shared/storage.py @@ -0,0 +1,324 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import ujson as json +import base64 +from minio import Minio +from minio.error import S3Error +from shared.api_types import TranscriptionParams +from shared.otel import OpenTelemetryInstrumentation +from opentelemetry.trace.status import StatusCode +import os +import logging +import urllib3 +from urllib3 import Retry +from urllib3.util import Timeout +from typing import Optional + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Minio config +MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000") +MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin") +MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin") +MINIO_BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME", "audio-results") + + +# TODO: use this to wrap redis as well +# TODO: wrap errors in StorageError +# TODO: implement cleanup and delete as well +class StorageManager: + def __init__(self, telemetry: OpenTelemetryInstrumentation): + """ + Initialize MinIO client and ensure bucket exists + requires: OpenTelemetryInstrumentation instance for tracing since Minio + does not have an auto otel instrumentor + """ + try: + self.telemetry: OpenTelemetryInstrumentation = telemetry + # pass in http_client for tracing + http_client = urllib3.PoolManager( + timeout=Timeout(connect=5, read=5), + maxsize=10, + retries=Retry( + total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] + ), + ) + self.client = Minio( + os.getenv("MINIO_ENDPOINT", "minio:9000"), + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), + secure=os.getenv("MINIO_SECURE", "false").lower() == "true", + http_client=http_client, + ) + + self.bucket_name = os.getenv("MINIO_BUCKET_NAME", "audio-results") + self._ensure_bucket_exists() + logger.info("Successfully initialized MinIO storage") + + except Exception as e: + logger.error(f"Failed to initialize MinIO client: {e}") + raise + + def _ensure_bucket_exists(self): + try: + if not self.client.bucket_exists(self.bucket_name): + self.client.make_bucket(self.bucket_name) + except Exception as e: + logger.error(f"Failed to ensure bucket exists: {e}") + raise + + def _get_object_path(self, user_id: str, job_id: str, filename: str) -> str: + """Generate the full object path including user isolation""" + return f"{user_id}/{job_id}/{filename}" + + def store_file( + self, + user_id: str, + job_id: str, + content: bytes, + filename: str, + content_type: str, + metadata: dict = None, + ) -> None: + """Store any file type in MinIO with metadata""" + with self.telemetry.tracer.start_as_current_span("store_file") as span: + span.set_attribute("user_id", user_id) + span.set_attribute("job_id", job_id) + span.set_attribute("filename", filename) + try: + object_name = self._get_object_path(user_id, job_id, filename) + self.client.put_object( + self.bucket_name, + object_name, + io.BytesIO(content), + length=len(content), + content_type=content_type, + metadata=metadata.model_dump() + if hasattr(metadata, "model_dump") + else metadata, + ) + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error( + f"Failed to store file {filename} for user {user_id}, job {job_id}: {str(e)}" + ) + raise + + def store_audio( + self, + user_id: str, + job_id: str, + audio_content: bytes, + filename: str, + transcription_params: TranscriptionParams, + ): + """Store audio file with metadata in MinIO""" + with self.telemetry.tracer.start_as_current_span("store_audio") as span: + span.set_attribute("job_id", job_id) + span.set_attribute("user_id", user_id) + span.set_attribute("filename", filename) + try: + object_name = self._get_object_path(user_id, job_id, filename) + + # Convert transcription params to JSON string for metadata + params_json = json.dumps(transcription_params.model_dump()) + + # Create metadata dictionary with transcription params + metadata = {"X-Amz-Meta-Transcription-Params": params_json} + + self.client.put_object( + self.bucket_name, + object_name, + io.BytesIO(audio_content), + len(audio_content), + content_type="audio/mpeg", + metadata=metadata, + ) + logger.info( + f"Stored audio for user {user_id}, job {job_id} in MinIO as {object_name} with metadata" + ) + + except S3Error as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error(f"Failed to store audio in MinIO: {e}") + raise + + def get_podcast_audio(self, user_id: str, job_id: str) -> Optional[str]: + """Get the audio data for a specific podcast by job_id""" + with self.telemetry.tracer.start_as_current_span("get_podcast_audio") as span: + span.set_attribute("job_id", job_id) + span.set_attribute("user_id", user_id) + try: + # Find the file with matching user_id and job_id + prefix = f"{user_id}/{job_id}/" + objects = self.client.list_objects( + self.bucket_name, prefix=prefix, recursive=True + ) + + for obj in objects: + if obj.object_name.endswith(".mp3"): + span.set_attribute("audio_file", obj.object_name) + audio_data = self.client.get_object( + self.bucket_name, obj.object_name + ).read() + return base64.b64encode(audio_data).decode("utf-8") + + return None + + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error( + f"Failed to get audio for user {user_id}, job {job_id}: {str(e)}" + ) + raise + + def get_file(self, user_id: str, job_id: str, filename: str) -> Optional[bytes]: + """Get any file from storage by user_id, job_id and filename""" + with self.telemetry.tracer.start_as_current_span("get_file") as span: + span.set_attribute("job_id", job_id) + span.set_attribute("user_id", user_id) + span.set_attribute("filename", filename) + try: + object_name = self._get_object_path(user_id, job_id, filename) + + try: + data = self.client.get_object(self.bucket_name, object_name).read() + return data + except S3Error as e: + span.set_attribute("error", str(e)) + if e.code == "NoSuchKey": + return None + raise + + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error( + f"Failed to get file {filename} for user {user_id}, job {job_id}: {str(e)}" + ) + raise + + def delete_job_files(self, user_id: str, job_id: str) -> bool: + """Delete all files associated with a user_id and job_id""" + with self.telemetry.tracer.start_as_current_span("delete_job_files") as span: + span.set_attribute("job_id", job_id) + span.set_attribute("user_id", user_id) + try: + # List all objects with the user_id/job_id prefix + prefix = f"{user_id}/{job_id}/" + objects = self.client.list_objects( + self.bucket_name, prefix=prefix, recursive=True + ) + + # Delete each object + for obj in objects: + self.client.remove_object(self.bucket_name, obj.object_name) + logger.info(f"Deleted object: {obj.object_name}") + + return True + + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error( + f"Failed to delete files for user {user_id}, job {job_id}: {str(e)}" + ) + return False + + def list_files_metadata(self, user_id: str = None): + """Lists metadata filtered by user_id if provided""" + with self.telemetry.tracer.start_as_current_span("list_files_metadata") as span: + try: + # If user_id is provided, use it as prefix to filter results + prefix = f"{user_id}/" if user_id else "" + span.set_attribute("user_id", user_id) + span.set_attribute("prefix", prefix) + + objects = self.client.list_objects( + self.bucket_name, prefix=prefix, recursive=True + ) + files = [] + + for obj in objects: + logger.info(f"Object: {obj.object_name}") + if obj.object_name.endswith("/"): + continue + + try: + stat = self.client.stat_object( + self.bucket_name, obj.object_name + ) + path_parts = obj.object_name.split("/") + logger.info(f"Path parts: {path_parts}") + + if not path_parts[-1].endswith(".mp3"): + continue + + # Update to handle new path structure: user_id/job_id/filename + user_id = path_parts[0] + job_id = path_parts[1] + + file_info = { + "user_id": user_id, + "job_id": job_id, + "filename": path_parts[-1], + "size": stat.size, + "created_at": obj.last_modified.isoformat(), + "path": obj.object_name, + "transcription_params": {}, + } + + if stat.metadata: + try: + params = stat.metadata.get( + "X-Amz-Meta-Transcription-Params" + ) + if params: + file_info["transcription_params"] = json.loads( + params + ) + except json.JSONDecodeError: + logger.warning( + f"Could not parse transcription params for {obj.object_name}" + ) + + files.append(file_info) + logger.info( + f"Found file: {obj.object_name}, size: {stat.size} bytes" + ) + + except Exception as e: + logger.error( + f"Error processing object {obj.object_name}: {str(e)}" + ) + continue + + files.sort(key=lambda x: x["created_at"], reverse=True) + logger.info( + f"Successfully listed {len(files)} metadata for {len(files)} files from MinIO" + ) + return files + + except Exception as e: + span.set_status(StatusCode.ERROR) + span.record_exception(e) + logger.error(f"Failed to list files from MinIO: {str(e)}") + raise diff --git a/frontend/static/diagram.png b/frontend/static/diagram.png new file mode 100644 index 0000000..249f2e0 Binary files /dev/null and b/frontend/static/diagram.png differ diff --git a/frontend/utils/email_demo.py b/frontend/utils/email_demo.py new file mode 100644 index 0000000..91bf1bd --- /dev/null +++ b/frontend/utils/email_demo.py @@ -0,0 +1,502 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests +import os +import json as json +import time +from datetime import datetime +from threading import Thread, Event +import websockets +import asyncio +from urllib.parse import urljoin +import argparse +from typing import List + +import smtplib +from email.message import EmailMessage +from email.mime.base import MIMEBase +from email import encoders + +# Add global TEST_USER_ID +TEST_USER_ID = "test-userid" + + +class StatusMonitor: + def __init__(self, base_url, job_id): + self.base_url = base_url + self.job_id = job_id + self.ws_url = self._get_ws_url(base_url) + self.stop_event = Event() + self.services = {"pdf", "agent", "tts"} + self.last_statuses = {service: None for service in self.services} + self.tts_completed = Event() + self.websocket = None + self.reconnect_delay = 1.0 + self.max_reconnect_delay = 30.0 + self.ready_event = asyncio.Event() + + def _get_ws_url(self, base_url): + """Convert HTTP URL to WebSocket URL""" + if base_url.startswith("https://"): + ws_base = "wss://" + base_url[8:] + else: + ws_base = "ws://" + base_url[7:] + return urljoin(ws_base, f"/ws/status/{self.job_id}") + + def get_time(self): + return datetime.now().strftime("%H:%M:%S") + + def start(self): + """Start the WebSocket monitoring in a separate thread""" + self.thread = Thread(target=self._run_async_loop) + self.thread.start() + + def stop(self): + """Stop the WebSocket monitoring""" + self.stop_event.set() + self.thread.join() + + def _run_async_loop(self): + """Run the asyncio event loop in a separate thread""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._monitor_status()) + + async def _monitor_status(self): + while not self.stop_event.is_set(): + try: + async with websockets.connect(self.ws_url) as websocket: + self.websocket = websocket + self.reconnect_delay = 1.0 + print(f"[{self.get_time()}] Connected to status WebSocket") + + while not self.stop_event.is_set(): + try: + message = await asyncio.wait_for( + websocket.recv(), timeout=30 + ) + + # Handle ready check message + try: + data = json.loads(message) + if data.get("type") == "ready_check": + await websocket.send("ready") + print( + f"[{self.get_time()}] Sent ready acknowledgment" + ) + continue + except json.JSONDecodeError: + pass + + await self._handle_message(message) + except asyncio.TimeoutError: + try: + pong_waiter = await websocket.ping() + await pong_waiter + except Exception: + break + + except websockets.exceptions.ConnectionClosed: + self.ready_event.clear() + if not self.stop_event.is_set(): + print( + f"[{self.get_time()}] WebSocket connection closed, reconnecting..." + ) + + except Exception as e: + self.ready_event.clear() + if not self.stop_event.is_set(): + print(f"[{self.get_time()}] WebSocket error: {e}, reconnecting...") + + if not self.stop_event.is_set(): + await asyncio.sleep(self.reconnect_delay) + self.reconnect_delay = min( + self.reconnect_delay * 1.5, self.max_reconnect_delay + ) + + async def _handle_message(self, message): + """Handle incoming WebSocket messages""" + try: + data = json.loads(message) + service = data.get("service") + status = data.get("status") + msg = data.get("message", "") + + if service in self.services: + current_status = f"{service}: {status} - {msg}" + if current_status != self.last_statuses[service]: + print(f"[{self.get_time()}] {current_status}") + self.last_statuses[service] = current_status + + if status == "failed": + print(f"[{self.get_time()}] Job failed in {service}: {msg}") + self.stop_event.set() + + if service == "tts" and status == "completed": + self.tts_completed.set() + self.stop_event.set() + + except json.JSONDecodeError: + print(f"[{self.get_time()}] Received invalid JSON: {message}") + except Exception as e: + print(f"[{self.get_time()}] Error processing message: {e}") + + +def get_output_with_retry(base_url: str, job_id: str, max_retries=5, retry_delay=1): + """Retry getting output with exponential backoff""" + for attempt in range(max_retries): + try: + response = requests.get( + f"{base_url}/output/{job_id}", params={"userId": TEST_USER_ID} + ) + if response.status_code == 200: + return response.content + elif response.status_code == 404: + wait_time = retry_delay * (2**attempt) + print( + f"[datetime.now().strftime('%H:%M:%S')] Output not ready yet, retrying in {wait_time:.1f}s..." + ) + time.sleep(wait_time) + continue + else: + response.raise_for_status() + except requests.RequestException as e: + print(f"[datetime.now().strftime('%H:%M:%S')] Error getting output: {e}") + if attempt == max_retries - 1: + raise + time.sleep(retry_delay * (2**attempt)) + + raise TimeoutError("Failed to get output after maximum retries") + + +def test_saved_podcasts(base_url: str, job_id: str, max_retries=5, retry_delay=5): + """Test the saved podcasts endpoints with retry logic""" + print( + f"\n[{datetime.now().strftime('%H:%M:%S')}] Testing saved podcasts endpoints..." + ) + + # Test 1: Get all saved podcasts with retry + print("\nTesting list all podcasts endpoint...") + for attempt in range(max_retries): + response = requests.get( + f"{base_url}/saved_podcasts", params={"userId": TEST_USER_ID} + ) + assert ( + response.status_code == 200 + ), f"Failed to get saved podcasts: {response.text}" + podcasts = response.json()["podcasts"] + print(f"Found {len(podcasts)} saved podcasts") + + # Check if our job_id is in the list + job_ids = [podcast["job_id"] for podcast in podcasts] + if job_id in job_ids: + print(f"Successfully found job_id {job_id} in saved podcasts list") + break + elif attempt < max_retries - 1: + wait_time = retry_delay * (2**attempt) + print( + f"Job ID not found yet, retrying in {wait_time:.1f}s... (attempt {attempt + 1}/{max_retries})" + ) + time.sleep(wait_time) + continue + else: + assert False, f"Recently created job_id {job_id} not found in saved podcasts after {max_retries} attempts" + + # Test 2: Get specific podcast metadata + print("\nTesting individual podcast metadata endpoint...") + response = requests.get( + f"{base_url}/saved_podcast/{job_id}/metadata", params={"userId": TEST_USER_ID} + ) + assert ( + response.status_code == 200 + ), f"Failed to get podcast metadata: {response.text}" + metadata = response.json() + print(f"Retrieved metadata for podcast: {metadata.get('filename', 'unknown')}") + print(f"Metadata: {json.dumps(metadata, indent=2)}") + + # Test 3: Get specific podcast audio + print("\nTesting individual podcast audio endpoint...") + response = requests.get( + f"{base_url}/saved_podcast/{job_id}/audio", params={"userId": TEST_USER_ID} + ) + assert response.status_code == 200, f"Failed to get podcast audio: {response.text}" + audio_data = response.content + print(f"Successfully retrieved audio data, size: {len(audio_data)} bytes") + + +def test_api( + base_url: str, + target_files: List[str], + context_files: List[str], + email: str, + monologue: bool = False, + vdb: bool = False, +): + voice_mapping = { + "speaker-1": "iP95p4xoKVk53GoZ742B", + } + + if not monologue: + voice_mapping["speaker-2"] = "9BWtsMINqrJLrRacOk9x" + + process_url = f"{base_url}/process_pdf" + + # Update path resolution + current_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.dirname(current_dir) + samples_dir = os.path.join(project_root, "samples") + + # Prepare the payload with updated schema and userId + transcription_params = { + "name": "ishan-test", + "duration": 2, + "speaker_1_name": "Bob", + "voice_mapping": voice_mapping, + "guide": None, + "monologue": monologue, + "userId": TEST_USER_ID, + "vdb_task": vdb, + } + + if not monologue: + transcription_params["speaker_2_name"] = "Kate" + + print( + f"\n[{datetime.now().strftime('%H:%M:%S')}] Submitting PDFs for processing..." + ) + print(f"Using voices: {voice_mapping}") + + # Prepare multipart form data + form_data = [] + + # Process target files + for pdf_file in target_files: + if not os.path.isabs(pdf_file): + pdf_file = os.path.join(samples_dir, pdf_file) + + with open(pdf_file, "rb") as f: + content = f.read() + form_data.append( + ( + "target_files", + (os.path.basename(pdf_file), content, "application/pdf"), + ) + ) + + # Process context files + if context_files is not None: + for pdf_file in context_files: + if not os.path.isabs(pdf_file): + pdf_file = os.path.join(samples_dir, pdf_file) + + with open(pdf_file, "rb") as f: + content = f.read() + form_data.append( + ( + "context_files", + (os.path.basename(pdf_file), content, "application/pdf"), + ) + ) + + # Add transcription parameters + form_data.append(("transcription_params", (None, json.dumps(transcription_params)))) + + try: + response = requests.post(process_url, files=form_data) + + assert ( + response.status_code == 202 + ), f"Expected status code 202, but got {response.status_code}. Response: {response.text}" + job_data = response.json() + assert "job_id" in job_data, "Response missing job_id" + job_id = job_data["job_id"] + print(f"[{datetime.now().strftime('%H:%M:%S')}] Job ID received: {job_id}") + + # Step 2: Start monitoring status via WebSocket + monitor = StatusMonitor(base_url, job_id) + monitor.start() + + try: + # Wait for TTS completion or timeout + max_wait = 40 * 60 + if not monitor.tts_completed.wait(timeout=max_wait): + raise TimeoutError(f"Test timed out after {max_wait} seconds") + + # If we get here, TTS completed successfully + print( + f"\n[{datetime.now().strftime('%H:%M:%S')}] TTS processing completed, retrieving audio file..." + ) + + # Get the final output with retry logic + audio_content = get_output_with_retry(base_url, job_id) + + # Save the audio file + output_path = os.path.join("/project/frontend/demo_outputs/", str(email[0]).split('@')[0] + "-output.mp3") + with open(output_path, "wb") as f: + f.write(audio_content) + print( + f"[{datetime.now().strftime('%H:%M:%S')}] Audio file saved as '{output_path}'" + ) + + # Test saved podcasts endpoints with the newly created job_id + test_saved_podcasts(base_url, job_id) + + # Test RAG endpoint if vdb flag is enabled + if vdb: + print("\nTesting RAG endpoint...") + test_query = "What is the main topic of this document?" + rag_response = requests.post( + f"{base_url}/query_vector_db", + json={"query": test_query, "k": 3, "job_id": job_id}, + ) + assert ( + rag_response.status_code == 200 + ), f"RAG endpoint failed: {rag_response.text}" + rag_results = rag_response.json() + print(f"RAG Query: '{test_query}'") + print(f"RAG Results: {json.dumps(rag_results, indent=2)}") + + finally: + monitor.stop() + return job_id + + except Exception as e: + print(f"Error during PDF submission: {e}") + raise + +def send_file_via_email(file_location, sender_email, recipient_email): + # Email configuration + print("Configuring Sender Details") + sender_email = sender_email + email_password = os.environ['SENDER_EMAIL_PASSWORD'] + smtp_server = "smtp.gmail.com" + smtp_port = 587 + + # Create email message + print("Configuring Email Message") + msg = EmailMessage() + msg['Subject'] = '[Auto-generated] Your podcast is here!' + msg['From'] = sender_email + msg['To'] = recipient_email + msg.set_content('Your generated podcast is here! Please find the attached file.\n\nThank you for using the AI Research Assistant Blueprint powered by NVIDIA AI Workbench!') + + # Add file attachment + print("Adding podcast as attachment") + with open(file_location, 'rb') as file: + file_data = file.read() + file_name = os.path.basename(file_location) + + msg.add_attachment(file_data, maintype='application', subtype='octet-stream', filename=file_name) + + # Send email + print("Attempting to send email.") + try: + with smtplib.SMTP(smtp_server, smtp_port) as server: + server.starttls() + server.login(sender_email, email_password) + server.send_message(msg) + print(f"File '{file_name}' sent successfully to {recipient_email}") + except Exception as e: + print(f"An error occurred: {str(e)}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Process PDF files for audio conversion", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" + Examples: + # Process with target and context files + python test.py --target main.pdf --context context1.pdf context2.pdf + + # Process with only context files + python test.py --context file1.pdf file2.pdf file3.pdf + + # Process with multiple target files + python test.py --target target1.pdf target2.pdf --context context1.pdf + """, + ) + + parser.add_argument( + "--target", + nargs="+", + default=[], + help="PDF files to use as targets", + metavar="PDF", + ) + parser.add_argument( + "--context", + nargs="+", + default=[], + help="PDF files to use as context", + metavar="PDF", + ) + parser.add_argument( + "--api-url", + default=os.getenv("API_SERVICE_URL", "http://localhost:8002"), + help="API service URL (default: from API_SERVICE_URL env var or http://localhost:8002)", + ) + parser.add_argument( + "--monologue", + action="store_true", + help="Generate a monologue instead of a dialogue", + ) + parser.add_argument( + "--vdb", + action="store_true", + help="Enable Vector Database processing", + ) + parser.add_argument( + "--sender_email", + nargs="+", + default="", + help="Sender email for podcast recording (GMail)", + metavar="EMAIL", + ) + parser.add_argument( + "--recipient_email", + nargs="+", + default="", + help="Recipient email for podcast recording", + metavar="EMAIL", + ) + + args = parser.parse_args() + + # Validate that at least one file was provided + if not args.target and not args.context: + parser.error( + "At least one PDF file must be provided (either target or context)" + ) + + print(f"API URL: {args.api_url}") + print(f"Target PDF files: {args.target}") + print(f"Context PDF files: {args.context}") + print(f"Sender Email: {args.sender_email}") + print(f"Recipient Email: {args.recipient_email}") + print(f"Monologue mode: {args.monologue}") + print(f"VDB mode: {args.vdb}") + print(f"Using test user ID: {TEST_USER_ID}") + + test_api( + args.api_url, + args.target, + args.context, + args.recipient_email, + args.monologue, + args.vdb, + ) + + send_file_via_email("/project/frontend/demo_outputs/" + str(args.recipient_email[0]).split('@')[0] + "-output.mp3", str(args.sender_email[0]), str(args.recipient_email[0])) diff --git a/frontend/utils/logger.py b/frontend/utils/logger.py new file mode 100644 index 0000000..f475a81 --- /dev/null +++ b/frontend/utils/logger.py @@ -0,0 +1,39 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +""" A logger class for capturing stdout and displaying it to the user on the app. """ + +class Logger: + def __init__(self, filename): + self.terminal = sys.stdout + self.log = open(filename, "w") + + def write(self, message): + self.terminal.write(message) + self.log.write(message) + + def flush(self): + self.terminal.flush() + self.log.flush() + + def isatty(self): + return False + +def read_logs(): + sys.stdout.flush() + with open("/project/frontend/output.log", "r") as f: + return f.read() \ No newline at end of file diff --git a/launchable/.gitkeep b/launchable/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/launchable/PDFtoPodcast.ipynb b/launchable/PDFtoPodcast.ipynb index 1fe4570..a55691e 100644 --- a/launchable/PDFtoPodcast.ipynb +++ b/launchable/PDFtoPodcast.ipynb @@ -616,7 +616,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/models.json b/models.json index 99284e3..acab7e8 100644 --- a/models.json +++ b/models.json @@ -11,4 +11,4 @@ "name": "meta/llama-3.1-70b-instruct", "api_base": "https://integrate.api.nvidia.com/v1" } -} +} \ No newline at end of file diff --git a/postBuild.bash b/postBuild.bash new file mode 100644 index 0000000..c5143e5 --- /dev/null +++ b/postBuild.bash @@ -0,0 +1,5 @@ +#!/bin/bash +# This file contains bash commands that will be executed at the end of the container build process, +# after all system packages and programming language specific package have been installed. +# +# Note: This file may be removed if you don't need to use it diff --git a/preBuild.bash b/preBuild.bash new file mode 100644 index 0000000..e2866a1 --- /dev/null +++ b/preBuild.bash @@ -0,0 +1,5 @@ +#!/bin/bash +# This file contains bash commands that will be executed at the beginning of the container build process, +# before any system packages or programming language specific package have been installed. +# +# Note: This file may be removed if you don't need to use it diff --git a/requirements.txt b/requirements.txt index cb72513..0f28d20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,6 @@ minio httpx jinja2 ruff -ujson \ No newline at end of file +ujson +jupyterlab>3.0 +gradio==4.43.0 diff --git a/samples/.gitkeep b/samples/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/services/.gitkeep b/services/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/shared/.gitkeep b/shared/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/.gitkeep b/tests/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/variables.env b/variables.env new file mode 100644 index 0000000..d5224a9 --- /dev/null +++ b/variables.env @@ -0,0 +1,8 @@ +# Set environment variables in the format KEY=VALUE, 1 per line +# This file will be sourced inside the project container when started. +# NOTE: If you change this file while the project is running, you must restart the project container for changes to take effect. + +MAX_CONCURRENT_REQUESTS=1 +API_SERVICE_URL=http://pdf-to-podcast-api-service-1:8002 +AI_WORKBENCH_FLAG=true +SENDER_EMAIL=To use email, you must also add SENDER_EMAIL_PASSWORD as a Secret! diff --git a/workbench/PDFtoPodcast.ipynb b/workbench/PDFtoPodcast.ipynb new file mode 100644 index 0000000..3984047 --- /dev/null +++ b/workbench/PDFtoPodcast.ipynb @@ -0,0 +1,415 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "6b998a5c-45a6-4f05-9c7c-b0f3c4c74c17", + "metadata": {}, + "source": [ + "# PDF to Podcast NVIDIA AI Blueprint" + ] + }, + { + "cell_type": "markdown", + "id": "8a0f2acb-52d9-49dd-9676-2d58715786f5", + "metadata": {}, + "source": [ + "Ever wished you could generate podcasts from your own private data? The PDF to podcast NVIDIA AI blueprint makes it possible to build an AI research assistant that creates engaging audio outputs from PDF files. At its core, PDF to podcast is a sophisticated system that:\n", + "\n", + "- Transforms dense PDF documents into natural, engaging conversations\n", + "- Creates AI-generated podcasts with either single-speaker or two-person formats\n", + "- Uses cutting-edge language models (powered by LLama 3.1-70B NIM, LLama 3.1-8B NIM, and LLama 3.1-405B NIM) to ensure high-quality content\n", + "- Leverages ElevenLabs' voice synthesis for natural-sounding audio" + ] + }, + { + "cell_type": "markdown", + "id": "51dd4f69", + "metadata": {}, + "source": [ + "![image]()\n" + ] + }, + { + "cell_type": "markdown", + "id": "5af66273-f19d-4d4f-8480-31c5c03c5951", + "metadata": {}, + "source": [ + "## Features\n", + "What sets PDF to podcast apart is its flexible, microservice-driven architecture. Whether you're running on a single machine or a distributed cluster, this blueprint can be adapted to your needs and comes packed with modern tooling and observability features. **We've built this for you to edit and deploy on your own infrastructure with ease.**\n", + "\n", + "#### Observability & Monitoring\n", + "- **Jaeger Tracing ๐Ÿ”** - experience full distributed tracing built in. Watch requests flow through different services and quickly identify bottlenecks in your processing pipeline as you add your own code\n", + "- **MinIO Object Storage ๐Ÿ“ฆ** - robust, S3-compatible storage for handling PDFs and generated audio content. Perfect for scaling from development to production workloads.\n", + "- **GPU Optimized PDF processing ๐Ÿ”ฅ** - use docling for lighting fast optimized PDF processing\n", + "\n", + "#### Development Experience\n", + "- **UV Package Management ๐Ÿš€** - Lightning-fast dependency management using UV, making environment setup a breeze with `make uv`\n", + "- **Docker Compose Integration ๐Ÿณ** - one click spins up the entire stack, with smart handling of GPU resources and service dependencies.\n", + "\n", + "#### Quality and Testing\n", + "- **Automated Quality Checks โœจ** - integrated `ruff` for Python linting and formatting, ensuring consistent code quality across contributions.\n", + "- **End-to-End Testing ๐Ÿงช** - comprehensive test suite for verifying podcast generation, from PDF ingestion to final audio output." + ] + }, + { + "cell_type": "markdown", + "id": "2293d461-8cd1-48fb-bce7-dbf8515ea3f4", + "metadata": {}, + "source": [ + "## Create a podcast!\n", + "\n", + "For this example, we'll directly call the API to generate the podcast. First we write some helper functions to interact with the API\n", + "\n", + "
\n", + " Important: Ensure the docker compose services are running. In NVIDIA AI Workbench, navigate to Environment > Compose > Start. If you would like to leverage a locally-running Llama-3.1-8b-instruct model for this blueprint, ensure the \"local\" profile is selected before starting compose in AI Workbench." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "61e18296-a63f-4745-9d5a-3f0148ffabe9", + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "import json\n", + "import time\n", + "from typing import List\n", + "from IPython.display import Audio\n", + "from pathlib import Path\n", + "import os\n", + "\n", + "BASE_URL = os.environ[\"API_SERVICE_URL\"]\n", + "\n", + "def generate_podcast(\n", + " target_pdf_paths: List[str], \n", + " name: str,\n", + " duration: int,\n", + " speaker_1_name: str,\n", + " context_pdf_paths: List[str] = None,\n", + " is_monologue: bool = False,\n", + " speaker_2_name: str = None,\n", + " guide: str = None\n", + ") -> str:\n", + " \"\"\"\n", + " Generate a podcast using the API.\n", + " \n", + " Args:\n", + " target_pdf_paths: List of paths to main PDFs to analyze\n", + " name: Name of the podcast\n", + " duration: Desired duration in minutes\n", + " speaker_1_name: Name of the first speaker\n", + " context_pdf_paths: Optional list of paths to context PDFs\n", + " is_monologue: Whether to generate a monologue\n", + " speaker_2_name: Name of second speaker (required if not monologue)\n", + " guide: Optional guidance for the podcast structure\n", + " \"\"\"\n", + " # Handle single path inputs\n", + " if isinstance(target_pdf_paths, str):\n", + " target_pdf_paths = [target_pdf_paths]\n", + " if isinstance(context_pdf_paths, str):\n", + " context_pdf_paths = [context_pdf_paths]\n", + " \n", + " files = []\n", + " \n", + " # Add all target PDFs\n", + " for pdf_path in target_pdf_paths:\n", + " content = Path(pdf_path).read_bytes()\n", + " files.append(('target_files', (Path(pdf_path).name, content, 'application/pdf')))\n", + " \n", + " # Add all context PDFs if provided\n", + " if context_pdf_paths:\n", + " for pdf_path in context_pdf_paths:\n", + " content = Path(pdf_path).read_bytes()\n", + " files.append(('context_files', (Path(pdf_path).name, content, 'application/pdf')))\n", + " \n", + " # Configure voice mapping\n", + " voice_mapping = {\n", + " \"speaker-1\": \"iP95p4xoKVk53GoZ742B\" \n", + " }\n", + " if not is_monologue:\n", + " voice_mapping[\"speaker-2\"] = \"9BWtsMINqrJLrRacOk9x\"\n", + " \n", + " # Create parameters\n", + " params = {\n", + " \"userId\": \"test-userid\",\n", + " \"name\": name,\n", + " \"duration\": duration,\n", + " \"monologue\": is_monologue,\n", + " \"speaker_1_name\": speaker_1_name,\n", + " \"voice_mapping\": voice_mapping,\n", + " \"guide\": guide,\n", + " \"vdb_task\": False\n", + " }\n", + " if not is_monologue:\n", + " params[\"speaker_2_name\"] = speaker_2_name\n", + " \n", + " response = requests.post(\n", + " f\"{BASE_URL}/process_pdf\", \n", + " files=files,\n", + " data={'transcription_params': json.dumps(params)}\n", + " )\n", + " if response.status_code != 202:\n", + " raise Exception(f\"Failed to submit podcast generation: {response.text}\")\n", + " \n", + " return response.json()['job_id']\n", + "\n", + "def get_status(job_id: str) -> dict:\n", + " \"\"\"Get the current status of all services for a job.\"\"\"\n", + " response = requests.get(f\"{BASE_URL}/status/{job_id}?userId=test-userid\")\n", + " if response.status_code != 200:\n", + " raise Exception(f\"Failed to get status: {response.text}\")\n", + " return response.json()\n", + "\n", + "def wait_for_completion(job_id: str, check_interval: int = 5, initial_delay: int = 10):\n", + " \"\"\"\n", + " Poll the status endpoint until the podcast is ready.\n", + " Shows a simplified progress view.\n", + " \"\"\"\n", + " print(f\"Waiting {initial_delay} seconds for job to initialize...\")\n", + " time.sleep(initial_delay)\n", + " \n", + " last_messages = {} # Track last message for each service to avoid duplication\n", + " \n", + " while True:\n", + " try:\n", + " statuses = get_status(job_id)\n", + " \n", + " # Check each service and only print if status changed\n", + " for service, status in statuses.items():\n", + " current_msg = status.get('message', '')\n", + " if current_msg != last_messages.get(service):\n", + " print(f\"[{service.upper()}] {current_msg}\")\n", + " last_messages[service] = current_msg\n", + " \n", + " # Check if everything is completed\n", + " all_completed = all(\n", + " status.get('status') == 'JobStatus.COMPLETED' \n", + " for status in statuses.values()\n", + " )\n", + " \n", + " if all_completed and 'tts' in statuses:\n", + " print(\"\\nPodcast generation completed!\")\n", + " return\n", + " \n", + " # Check for failures\n", + " for service, status in statuses.items():\n", + " if status.get('status') == 'JobStatus.FAILED':\n", + " raise Exception(f\"Service {service} failed: {status.get('message')}\")\n", + " \n", + " time.sleep(check_interval)\n", + " \n", + " except requests.exceptions.RequestException as e:\n", + " if \"Job not found\" in str(e):\n", + " print(\"Waiting for job to start...\")\n", + " time.sleep(check_interval)\n", + " continue\n", + " raise\n", + " except Exception as e:\n", + " print(f\"Error: {e}\")\n", + " raise" + ] + }, + { + "cell_type": "markdown", + "id": "1ddfee58-4bd2-40ad-a2aa-0c10d14a3e23", + "metadata": {}, + "source": [ + "Next you will generate a monologue using various analyst reports on NVIDIAs most recent financial earnings. You can also add your own PDFs to this Jupyter Lab and point to them in code below. Note that context PDFs are optional and can be used to provide additional context for the generation process. Additonally, you can provide a `guide` to help guide the generation process.\n", + "\n", + "**Agent Configuration**: The NVIDIA NIM microservices used for the long agent reasoning step are configurable under ``models.json``. By default the blueprint utilizes Build API endpoints, but this is customizable. \n", + "\n", + "For example, if you would like to leverage a locally-running Llama-3.1-8b-instruct model, ensure the ``local`` profile is selected before starting compose in AI Workbench, and adjust ``models.json`` to the following:\n", + "\n", + "```json\n", + "{\r\n", + " \"reasoning\": {\r\n", + " \"name\": \"meta/llama-3.1-8b-instruct\",\r\n", + " \"api_base\": \"http://pdf-to-podcast-local-nim-1:8000/v1\"\r\n", + " },\r\n", + " ...\r\n", + "}\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bfc3a2b6-8930-412d-8a6f-678fad2de97c", + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " print(\"Submitting podcast generation request...\")\n", + " job_id = generate_podcast(\n", + " target_pdf_paths=[\n", + " \"../samples/investorpres-main.pdf\",\n", + " ],\n", + " context_pdf_paths=[\n", + " \"../samples/bofa-context.pdf\",\n", + " \"../samples/citi-context.pdf\"\n", + " ],\n", + " name=\"NVIDIA Earnings Analysis\",\n", + " duration=15,\n", + " speaker_1_name=\"Alex\",\n", + " is_monologue=True,\n", + " guide=\"Focus on NVIDIA's earnings and the key points driving it's growth\"\n", + " )\n", + " print(f\"Job ID: {job_id}\")\n", + " wait_for_completion(job_id)\n", + "except Exception as e:\n", + " print(f\"Error: {e}\")" + ] + }, + { + "cell_type": "markdown", + "id": "a764dfff", + "metadata": {}, + "source": [ + "You can also generate a 2 person podcast by calling the same function but setting `is_monologue=False` and providing a `speaker_2_name`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c2a24fa8-b024-4a79-9214-d129a6b8c382", + "metadata": {}, + "outputs": [], + "source": [ + "!curl \"pdf-to-podcast-api-service-1:8002/output/{job_id}?userId=test-userid\" --output temp_audio.mp3\n", + "Audio(\"temp_audio.mp3\")" + ] + }, + { + "cell_type": "markdown", + "id": "32d8f529-09d4-43fc-8667-bfa48fd9d029", + "metadata": {}, + "source": [ + "## Understanding Your Generated Podcast\n", + "\n", + "After generating your podcast, you have access to a couple valuable outputs that provide insights into the generation process and content. Here's what endpoints you can use to explore:\n", + "\n", + "#### 1. The Transcript\n", + "```python\n", + "/saved_podcast/{job_id}/transcript\"\n", + "```\n", + "\n", + "The transcript provides a text version of your podcast, which is valuable for:\n", + "- Quick content review without audio playback\n", + "- Creating show notes or content summaries\n", + "- Finding and quoting specific discussion points\n", + "- Making content searchable and referenceable\n", + "- Ensuring accessibility of your content" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f01e8f10-6ca5-4143-9883-b479006155b8", + "metadata": {}, + "outputs": [], + "source": [ + "!curl \"pdf-to-podcast-api-service-1:8002/saved_podcast/{job_id}/transcript?userId=test-userid\"" + ] + }, + { + "cell_type": "markdown", + "id": "c4180391-05e9-4d5d-a66c-1e6dc6bb3163", + "metadata": {}, + "source": [ + "#### 2. Prompt and Generation History\n", + "```python\n", + "/saved_podcast/{job_id}/history\n", + "```\n", + "\n", + "The history reveals the AI's thought process, showing you:\n", + "- How the system analyzed and interpreted your PDFs\n", + "- Key topics and themes identified\n", + "- The structural decisions made for the conversation\n", + "- The reasoning behind content organization\n", + "- How different sections were prioritized and connected\n", + "\n", + "This is particularly useful for:\n", + "- Understanding how the AI makes decisions\n", + "- Improving future podcast generations\n", + "- Verifying content accuracy and relevance\n", + "- Fine-tuning and evals on your prompts" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cbe01fc0-e111-4fe0-9530-fe549e5892d0", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "!curl \"pdf-to-podcast-api-service-1:8002/saved_podcast/{job_id}/history?userId=test-userid\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5642285-2746-47fc-9b47-9f58d33301bb", + "metadata": {}, + "outputs": [], + "source": [ + "!curl \"pdf-to-podcast-api-service-1:8002/saved_podcast/{job_id}/metadata?userId=test-userid\"" + ] + }, + { + "cell_type": "markdown", + "id": "aaae77e9", + "metadata": {}, + "source": [ + "### Tools for Understanding Your Generated Podcast\n", + "\n", + "After generating your podcast, you can explore the generation process through several tools:\n", + "\n", + "#### 1. Jaeger Tracing Interface\n", + "Access Jaeger at `:16686` to:\n", + "- Visualize the complete request flow\n", + "- Debug processing bottlenecks\n", + "- Monitor service performance\n", + "- Track PDF processing and audio generation stages\n", + "\n", + "#### 2. MinIO Object Storage\n", + "Access MinIO at `:9001` with:\n", + "```\n", + "Username: minioadmin\n", + "Password: minioadmin\n", + "```\n", + "Here you can:\n", + "- Browse generated audio files\n", + "- Access intermediate processing artifacts\n", + "- View stored PDF documents\n", + "- Download or share content via presigned URLs\n", + "\n", + "#### 3. API Endpoints\n", + "You can access the API endpoint at `:8002/docs`." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/workbench/README.md b/workbench/README.md new file mode 100644 index 0000000..742103f --- /dev/null +++ b/workbench/README.md @@ -0,0 +1,60 @@ +# NVIDIA AI Workbench: Introduction [![Open In AI Workbench](https://img.shields.io/badge/Open_In-AI_Workbench-76B900)](https://ngc.nvidia.com/open-ai-workbench/aHR0cHM6Ly9naXRodWIuY29tL05WSURJQS1BSS1CbHVlcHJpbnRzL3BkZi10by1wb2RjYXN0) + + + + + +

+ :arrow_down: Download AI Workbench โ€ข + :book: Read the Docs โ€ข + :open_file_folder: Explore Example Projects โ€ข + :rotating_light: Facing Issues? Let Us Know! +

+ + +## Quickstart + +If you do not NVIDIA AI Workbench installed, first complete the installation for AI Workbench [here](https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/workbench/). + +Let's get started! + +1. (Optional) Fork this Project to your own GitHub namespace and copy the link. + + | :bulb: Tip | + | :-----------------------| + | We recommend forking this project to your own namespace as gives you write access for customization. You can still use this project without forking, but any changes you make will not be able to be pushed to the upstream repo as it is owned by NVIDIA. | + +2. Open the NVIDIA AI Workbench Desktop App. Select a location to work in. + +3. Clone this Project onto your desired machine by selecting **Clone Project** and providing the GitHub link. + +4. Wait for the project to build. You can expand the bottom **Building** indicator to view real-time build logs. + +5. When the build completes, set the following configurations. + + * `Environment` → `Secrets` → `Configure`. Specify the ``NVIDIA_API_KEY`` and ``ELEVENLABS_API_KEY`` Key. + * (Optional) Add a ``SENDER_EMAIL`` variable and a ``SENDER_EMAIL_PASSWORD`` secret to the project to use the email functionality on the frontend application. Gmail sender accounts are currently supported; you can create an App Password for your account [here](https://support.google.com/mail/answer/185833). + +6. Navigate to `Environment` → `Compose` and **Start** the Docker compose services. You can view progress under **Output** on the bottom left and selecting **Compose** logs from the dropdown. It may take a few minutes to pull and build the services. + + * The blueprint defaults to Build API endpoints. The services are ready when you see the following in the compose logs: + + ``` + celery-worker-1 | [2025-01-24 21:10:55,239: INFO/MainProcess] celery@ee170af41d1b ready. + ``` + + * To run the blueprint with a _locally-running_ Llama 3.1 8B Instruct NVIDIA NIM, be sure to specify the ``local`` profile from the profile dropdown before selecting **Start**. The services are ready when you see the following in the compose logs: + + ``` + local-nim-1 | INFO 2025-01-24 21:14:50.213 metrics.py:351] Avg prompt throughput: 0.0 tokens/s, Avg generation throughput: 0.0 tokens/s, Running: 0 reqs, Swapped: 0 reqs, Pending: 0 reqs, GPU KV cache usage: 0.0%, CPU KV cache usage: 0.0%. + ``` + + +8. (Option 1) Run the **Jupyter Notebook**. On the top right of the AI Workbench window, select **Open Jupyterlab**. Navigate to ``workbench/PDFtoPodcast.ipynb``, skip the setup sections, and get started immediately with the provided sample PDFs. + +9. (Option 2) Run the **Frontend application**. On the top right of the AI Workbench window, select **Open Frontend**. + + * Upload your own locally-stored, custom PDFs + * View and download your generated podcast locally + * Specify your agent parameters (local vs Build endpoints), + * (optional) Email your generated podcast to a recipient