-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add more airtable logging #3862
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -380,6 +380,15 @@ def index_doc_batch( | |
new_docs=0, total_docs=len(filtered_documents), total_chunks=0 | ||
) | ||
|
||
doc_descriptors = [ | ||
{ | ||
"doc_id": doc.id, | ||
"doc_length": doc.get_total_char_length(), | ||
} | ||
for doc in ctx.updatable_docs | ||
] | ||
logger.debug(f"Starting indexing process for documents: {doc_descriptors}") | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this make the line below ... "starting chunking" ... irrelevant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imo its fine to keep it in (lets you know that chunking is the first step, although no strong opinion) |
||
logger.debug("Starting chunking") | ||
chunks: list[DocAwareChunk] = chunker.chunk(ctx.updatable_docs) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import threading | ||
import time | ||
from collections.abc import Callable | ||
from concurrent.futures import as_completed | ||
from concurrent.futures import ThreadPoolExecutor | ||
from functools import wraps | ||
from typing import Any | ||
|
||
|
@@ -11,6 +13,7 @@ | |
from requests import Response | ||
from retry import retry | ||
|
||
from onyx.configs.app_configs import INDEXING_EMBEDDING_MODEL_NUM_THREADS | ||
from onyx.configs.app_configs import LARGE_CHUNK_RATIO | ||
from onyx.configs.app_configs import SKIP_WARM_UP | ||
from onyx.configs.model_configs import BATCH_SIZE_ENCODE_CHUNKS | ||
|
@@ -155,6 +158,7 @@ def _batch_encode_texts( | |
text_type: EmbedTextType, | ||
batch_size: int, | ||
max_seq_length: int, | ||
num_threads: int = INDEXING_EMBEDDING_MODEL_NUM_THREADS, | ||
) -> list[Embedding]: | ||
text_batches = batch_list(texts, batch_size) | ||
|
||
|
@@ -163,12 +167,15 @@ def _batch_encode_texts( | |
) | ||
|
||
embeddings: list[Embedding] = [] | ||
for idx, text_batch in enumerate(text_batches, start=1): | ||
|
||
def process_batch( | ||
batch_idx: int, text_batch: list[str] | ||
) -> tuple[int, list[Embedding]]: | ||
if self.callback: | ||
if self.callback.should_stop(): | ||
raise RuntimeError("_batch_encode_texts detected stop signal") | ||
|
||
logger.debug(f"Encoding batch {idx} of {len(text_batches)}") | ||
logger.debug(f"Encoding batch {batch_idx} of {len(text_batches)}") | ||
embed_request = EmbedRequest( | ||
model_name=self.model_name, | ||
texts=text_batch, | ||
|
@@ -185,10 +192,43 @@ def _batch_encode_texts( | |
) | ||
|
||
response = self._make_model_server_request(embed_request) | ||
embeddings.extend(response.embeddings) | ||
return batch_idx, response.embeddings | ||
|
||
# only multi thread if: | ||
# 1. num_threads is greater than 1 | ||
# 2. we are using an API-based embedding model (provider_type is not None) | ||
# 3. there are more than 1 batch (no point in threading if only 1) | ||
if num_threads >= 1 and self.provider_type and len(text_batches) > 1: | ||
with ThreadPoolExecutor(max_workers=num_threads) as executor: | ||
future_to_batch = { | ||
executor.submit(process_batch, idx, batch): idx | ||
for idx, batch in enumerate(text_batches, start=1) | ||
} | ||
|
||
# Collect results in order | ||
batch_results: list[tuple[int, list[Embedding]]] = [] | ||
for future in as_completed(future_to_batch): | ||
try: | ||
result = future.result() | ||
batch_results.append(result) | ||
if self.callback: | ||
self.callback.progress("_batch_encode_texts", 1) | ||
except Exception as e: | ||
logger.exception("Embedding model failed to process batch") | ||
raise e | ||
|
||
# Sort by batch index and extend embeddings | ||
batch_results.sort(key=lambda x: x[0]) | ||
for _, batch_embeddings in batch_results: | ||
embeddings.extend(batch_embeddings) | ||
else: | ||
# Original sequential processing | ||
for idx, text_batch in enumerate(text_batches, start=1): | ||
_, batch_embeddings = process_batch(idx, text_batch) | ||
embeddings.extend(batch_embeddings) | ||
if self.callback: | ||
self.callback.progress("_batch_encode_texts", 1) | ||
|
||
if self.callback: | ||
self.callback.progress("_batch_encode_texts", 1) | ||
return embeddings | ||
|
||
def encode( | ||
|
@@ -256,7 +296,7 @@ def from_db_model( | |
) | ||
|
||
|
||
class RerankingModel: | ||
class RerankingModel: # | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. empty comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed! |
||
def __init__( | ||
self, | ||
model_name: str, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the past i've seen that items yielded from generators in a loop are lazily evaluated (aka not processed until first accessed in the loop. Just wanted to double check that the timings printed here are indeed working correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does work, not sure exactly the standard behavior here though