ollama/examples/langchain-python-rag-privategpt/ingest.py
dcasota 5528dd9d11
Error handling load_single_document() in ingest.py (#4852)
load_single_document() handles
- corrupt files
- empty (zero byte) files
- unsupported file extensions
2024-06-09 10:41:07 -07:00

170 lines
6.3 KiB
Python
Executable file
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import glob
from typing import List
from multiprocessing import Pool
from tqdm import tqdm
from langchain.document_loaders import (
CSVLoader,
EverNoteLoader,
PyMuPDFLoader,
TextLoader,
UnstructuredEmailLoader,
UnstructuredEPubLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
UnstructuredODTLoader,
UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
from constants import CHROMA_SETTINGS
# Load environment variables
persist_directory = os.environ.get('PERSIST_DIRECTORY', 'db')
source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME', 'all-MiniLM-L6-v2')
chunk_size = 500
chunk_overlap = 50
# Custom document loaders
class MyElmLoader(UnstructuredEmailLoader):
"""Wrapper to fallback to text/plain when default does not work"""
def load(self) -> List[Document]:
"""Wrapper adding fallback for elm without html"""
try:
try:
doc = UnstructuredEmailLoader.load(self)
except ValueError as e:
if 'text/html content not found in email' in str(e):
# Try plain text
self.unstructured_kwargs["content_source"]="text/plain"
doc = UnstructuredEmailLoader.load(self)
else:
raise
except Exception as e:
# Add file_path to exception message
raise type(e)(f"{self.file_path}: {e}") from e
return doc
# Map file extensions to document loaders and their arguments
LOADER_MAPPING = {
".csv": (CSVLoader, {}),
# ".docx": (Docx2txtLoader, {}),
".doc": (UnstructuredWordDocumentLoader, {}),
".docx": (UnstructuredWordDocumentLoader, {}),
".enex": (EverNoteLoader, {}),
".eml": (MyElmLoader, {}),
".epub": (UnstructuredEPubLoader, {}),
".html": (UnstructuredHTMLLoader, {}),
".md": (UnstructuredMarkdownLoader, {}),
".odt": (UnstructuredODTLoader, {}),
".pdf": (PyMuPDFLoader, {}),
".ppt": (UnstructuredPowerPointLoader, {}),
".pptx": (UnstructuredPowerPointLoader, {}),
".txt": (TextLoader, {"encoding": "utf8"}),
# Add more mappings for other file extensions and loaders as needed
}
def load_single_document(file_path: str) -> List[Document]:
if os.path.getsize(file_path) != 0:
filename, ext = os.path.splitext(file_path)
if ext in LOADER_MAPPING:
loader_class, loader_args = LOADER_MAPPING[ext]
try:
loader = loader_class(file_path, **loader_args)
if loader:
return loader.load()
except:
print(f"Corrupted file {file_path}. Ignoring it.")
else:
print(f"Unsupported file {file_path}. Ignoring it.")
else:
print(f"Empty file {file_path}. Ignoring it.")
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
"""
Loads all documents from the source documents directory, ignoring specified files
"""
all_files = []
for ext in LOADER_MAPPING:
all_files.extend(
glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
)
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
with Pool(processes=os.cpu_count()) as pool:
results = []
with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
if docs:
results.extend(docs)
pbar.update()
return results
def process_documents(ignored_files: List[str] = []) -> List[Document]:
"""
Load documents and split in chunks
"""
print(f"Loading documents from {source_directory}")
documents = load_documents(source_directory, ignored_files)
if not documents:
print("No new documents to load")
exit(0)
print(f"Loaded {len(documents)} new documents from {source_directory}")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(documents)
print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
return texts
def does_vectorstore_exist(persist_directory: str) -> bool:
"""
Checks if vectorstore exists
"""
if os.path.exists(os.path.join(persist_directory, 'index')):
if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')):
list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin'))
list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl'))
# At least 3 documents are needed in a working vectorstore
if len(list_index_files) > 3:
return True
return False
def main():
# Create embeddings
embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
if does_vectorstore_exist(persist_directory):
# Update and store locally vectorstore
print(f"Appending to existing vectorstore at {persist_directory}")
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS)
collection = db.get()
texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
print(f"Creating embeddings. May take some minutes...")
db.add_documents(texts)
else:
# Create and store locally vectorstore
print("Creating new vectorstore")
texts = process_documents()
print(f"Creating embeddings. May take some minutes...")
db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory)
db.persist()
db = None
print(f"Ingestion complete! You can now run privateGPT.py to query your documents")
if __name__ == "__main__":
main()