-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
493 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,19 @@ | ||
NoUserMessage = Exception("No user message") | ||
NoLLMResponseMessage = Exception("No LLM response message") | ||
InvokeError = Exception | ||
class NoUserMessage(Exception): | ||
"""Exception raised when there is no user message.""" | ||
|
||
def __init__(self, message="No user message", *args): | ||
super().__init__(message, *args) | ||
|
||
|
||
class NoLLMResponseMessage(Exception): | ||
"""Exception raised when there is no response from the LLM.""" | ||
|
||
def __init__(self, message="No LLM response message", *args): | ||
super().__init__(message, *args) | ||
|
||
|
||
class InvokeError(Exception): | ||
"""Exception raised when an invocation error occurs.""" | ||
|
||
def __init__(self, message="Invocation error", *args): | ||
super().__init__(message, *args) |
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import List | ||
from ezpyai.llm.knowledge.knowledge_item import KnowledgeItem | ||
|
||
|
||
class Knowledge(ABC): | ||
@abstractmethod | ||
def get_name(self) -> str: | ||
pass | ||
|
||
@abstractmethod | ||
def get_dsn(self) -> str: | ||
pass | ||
|
||
@abstractmethod | ||
def destroy(self) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def store(self, data_path: str) -> None: | ||
pass | ||
|
||
|
||
class BaseKnowledge(Knowledge): | ||
_name: str = None | ||
_dsn: str = None | ||
|
||
def __init__(self, name: str, dsn: str) -> None: | ||
self._name = name | ||
self._dsn = dsn | ||
|
||
def __str__(self) -> str: | ||
return f"BaseKnowledge(name={self._name}, dsn={self._dsn})" | ||
|
||
def get_name(self) -> str: | ||
return self._name | ||
|
||
def get_dsn(self) -> str: | ||
return self._dsn | ||
|
||
def destroy(self) -> None: | ||
pass | ||
|
||
def store(self, collection: str, data_path: str) -> None: | ||
pass | ||
|
||
def search(self, collection: str, query: str) -> List[KnowledgeItem]: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
import os | ||
import json | ||
import magic | ||
import tempfile | ||
import zipfile | ||
import shutil | ||
import hashlib | ||
import logging | ||
import pandas as pd | ||
import xml.etree.ElementTree as ET | ||
import ezpyai.llm.knowledge.exceptions as exceptions | ||
|
||
from bs4 import BeautifulSoup | ||
from typing import Dict | ||
from PyPDF2 import PdfReader | ||
from docx import Document | ||
from ezpyai.llm.knowledge.knowledge_item import KnowledgeItem | ||
|
||
_MIMETYPE_TEXT = "text/plain" | ||
_MIMETYPE_JSON = "application/json" | ||
_MIMETYPE_PDF = "application/pdf" | ||
_MIMETYPE_DOCX = ( | ||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document" | ||
) | ||
_MIMETYPE_ZIP = "application/zip" | ||
_MIMETYPE_CSV = "text/csv" | ||
_MIMETYPE_HTML = "text/html" | ||
_MIMETYPE_XML = "text/xml" | ||
|
||
|
||
class KnowledgeGatherer: | ||
""" | ||
A class to gather knowledge from files within a directory or from a single file. | ||
This class supports reading and processing text, JSON, PDF, DOCX, and ZIP files, | ||
converting their content into plain text. | ||
It adds each file's data to the _items dictionary with its SHA256 hash as the key. | ||
Attributes: | ||
_items (Dict[str, KnowledgeItem]): Stores the processed | ||
knowledge data, with SHA256 of the content as keys. | ||
""" | ||
|
||
_items: Dict[str, KnowledgeItem] | ||
|
||
def __init__(self): | ||
"""Initialize the KnowledgeGatherer with an empty _items dictionary.""" | ||
self._items: Dict[str, KnowledgeItem] = {} | ||
|
||
logging.debug("KnowledgeGatherer initialized with an empty _items dictionary.") | ||
|
||
def __str__(self) -> str: | ||
return f"KnowledgeGatherer(data={self._items.keys()})" | ||
|
||
def _get_file_data(self, file_path: str, content: str) -> KnowledgeItem: | ||
""" | ||
Get the file data from the given file path and content. | ||
Args: | ||
file_path (str): The path to the file. | ||
content (str): The content of the file. | ||
Returns: | ||
KnowledgeItem: The file data as a KnowledgeItem object. | ||
""" | ||
logging.debug(f"Getting data from file: {file_path}") | ||
|
||
file_dir = os.path.dirname(file_path) | ||
file_name = os.path.splitext(os.path.basename(file_path))[0] | ||
file_ext = os.path.splitext(file_path)[1] | ||
|
||
return KnowledgeItem( | ||
content=content, | ||
metadata={ | ||
"file_dir": file_dir, | ||
"file_name": file_name, | ||
"file_ext": file_ext, | ||
}, | ||
) | ||
|
||
def _process_file(self, file_path: str): | ||
""" | ||
Process the given file and add its data to the _items dictionary. | ||
Args: | ||
file_path (str): The path to the file. | ||
Raises: | ||
UnsupportedFileTypeError: If the file type is not supported. | ||
FileReadError: If there is an error reading the file. | ||
""" | ||
logging.debug(f"Processing file: {file_path}") | ||
|
||
mime = magic.Magic(mime=True) | ||
mime_type = mime.from_file(file_path) | ||
|
||
try: | ||
if _MIMETYPE_TEXT in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_TEXT}") | ||
|
||
with open(file_path, "r", encoding="utf-8") as file: | ||
content = file.read() | ||
elif _MIMETYPE_JSON in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_JSON}") | ||
|
||
with open(file_path, "r", encoding="utf-8") as file: | ||
content = json.dumps(json.load(file)) | ||
elif _MIMETYPE_PDF in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_PDF}") | ||
|
||
reader = PdfReader(file_path) | ||
content = " ".join(page.extract_text() or "" for page in reader.pages) | ||
elif _MIMETYPE_DOCX in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_DOCX}") | ||
|
||
doc = Document(file_path) | ||
content = " ".join(para.text for para in doc.paragraphs if para.text) | ||
elif _MIMETYPE_CSV in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_CSV}") | ||
|
||
df = pd.read_csv(file_path) | ||
content = df.to_string() | ||
elif _MIMETYPE_HTML in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_HTML}") | ||
|
||
with open(file_path, "r", encoding="utf-8") as file: | ||
soup = BeautifulSoup(file, "html.parser") | ||
content = soup.get_text() | ||
elif _MIMETYPE_XML in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_XML}") | ||
|
||
tree = ET.parse(file_path) | ||
root = tree.getroot() | ||
content = "".join(root.itertext()) | ||
elif _MIMETYPE_ZIP in mime_type: | ||
logging.debug(f"Processing file: {file_path} as {_MIMETYPE_ZIP}") | ||
|
||
self._process_zip(file_path) | ||
return | ||
else: | ||
raise exceptions.UnsupportedFileTypeError( | ||
f"Unsupported file type for {file_path}" | ||
) | ||
except Exception as e: | ||
raise exceptions.FileReadError( | ||
f"Error reading {file_path}: {str(e)}" | ||
) from e | ||
|
||
content_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() | ||
self._items[content_hash] = self._get_file_data(file_path, content) | ||
|
||
logging.debug( | ||
f"Processed file: {file_path} and added to data dictionary with key {content_hash}" | ||
) | ||
|
||
def _process_zip(self, zip_path: str): | ||
""" | ||
Extract a zip file to a temporary directory and process its contents. | ||
Args: | ||
zip_path (str): The path to the zip file. | ||
Raises: | ||
FileProcessingError: If an error occurs during the processing of the ZIP file. | ||
""" | ||
logging.debug(f"Processing ZIP file: {zip_path}") | ||
|
||
temp_dir = tempfile.mkdtemp() | ||
|
||
try: | ||
with zipfile.ZipFile(zip_path, "r") as zip_ref: | ||
zip_ref.extractall(temp_dir) | ||
|
||
self._process_directory(temp_dir) | ||
except Exception as e: | ||
raise exceptions.FileProcessingError( | ||
f"Error processing ZIP file {zip_path}: {str(e)}" | ||
) from e | ||
finally: | ||
shutil.rmtree(temp_dir) | ||
|
||
def _process_directory(self, directory: str): | ||
""" | ||
Recursively process all files in the specified directory. | ||
Args: | ||
directory (str): The path to the directory. | ||
""" | ||
logging.debug(f"Processing directory: {directory}") | ||
|
||
for root, _, files in os.walk(directory): | ||
for file in files: | ||
file_path = os.path.join(root, file) | ||
self._process_file(file_path) | ||
|
||
def gather(self, file_path: str): | ||
""" | ||
Determine if the given path is a directory or a file and process it accordingly. | ||
Args: | ||
file_path (str): The path to the file or directory. | ||
""" | ||
logging.debug(f"Gathering data from: {file_path}") | ||
|
||
if os.path.isdir(file_path): | ||
self._process_directory(file_path) | ||
|
||
return | ||
|
||
self._process_file(file_path) | ||
|
||
def get_items(self) -> Dict[str, KnowledgeItem]: | ||
""" | ||
Return the list of collected knowledge data | ||
Returns: | ||
Dict[str, KnowledgeItem]: A dictionary containing file paths | ||
and their processed content indexed by SHA256 hashes of the content. | ||
""" | ||
return self._items |
Oops, something went wrong.