-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connector checkpointing #3876
base: main
Are you sure you want to change the base?
Connector checkpointing #3876
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
b0e50cf
to
f69739e
Compare
yield (doc, checkpoint, None) | ||
|
||
if thread_ts: | ||
seen_thread_ts.add(thread_ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe actually still wrong?
@abc.abstractmethod | ||
def load_from_checkpoint( | ||
self, | ||
start: SecondsSinceUnixEpoch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment
|
||
router = APIRouter(prefix="/manage") | ||
|
||
|
||
@router.get("/admin/indexing-errors/{index_attempt_id}") | ||
@router.get("/admin/index-attempt/{index_attempt_id}/errors") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: delete this one
window_end: datetime, | ||
) -> ConnectorCheckpoint: | ||
"""Get the latest valid checkpoint for a given connector credential pair""" | ||
checkpoint_candidates = get_recent_completed_attempts_for_cc_pair( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a limit to the number of tries for the same checkpoint
sa.Column("checkpoint_pointer", sa.String(), nullable=True), | ||
) | ||
op.add_column( | ||
"index_attempt", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems extremely specific to the concept of a poll start and end time. Are we sure checkpoints will always be time based?
@shared_task( | ||
name=OnyxCeleryTask.CHECK_FOR_CHECKPOINT_CLEANUP, | ||
soft_time_limit=300, | ||
bind=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the bind used here? maybe we can leave it out?
_: User = Depends(current_curator_or_admin_user), | ||
db_session: Session = Depends(get_session), | ||
) -> PaginatedReturn[IndexAttemptErrorPydantic]: | ||
total_count = count_index_attempt_errors_for_cc_pair( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring for endpoint?
|
||
router = APIRouter(prefix="/manage") | ||
|
||
|
||
@router.get("/admin/indexing-errors/{index_attempt_id}") | ||
@router.get("/admin/index-attempt/{index_attempt_id}/errors") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring?
@@ -519,6 +510,7 @@ def index_doc_batch( | |||
new_docs=len([r for r in insertion_records if r.already_existed is False]), | |||
total_docs=len(filtered_documents), | |||
total_chunks=len(access_aware_chunks), | |||
failures=vector_db_write_failures + embedding_falures, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
embedding_falures -> failures
if (failed_document is None and failed_entity is None) or ( | ||
failed_document is not None and failed_entity is not None | ||
): | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a Union type then?
id=doc.id, semantic_id=doc.semantic_identifier, section_link=section_link | ||
) | ||
|
||
class ConnectorCheckpoint(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put in a check somewhere so that at least we can't write arbitrarily large checkpoints
1. We have more than 3 failures AND | ||
2. Failures account for more than 10% of processed documents | ||
""" | ||
if total_failures > 3 and total_failures / (document_count or 1) > 0.1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could reduce nesting here for readability/flow
@@ -35,6 +35,19 @@ | |||
"expires": BEAT_EXPIRES_DEFAULT, | |||
}, | |||
}, | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should put after the cloud generator comment
except Exception as e: | ||
logger.exception( | ||
"Connector run exceptioned after elapsed time: " | ||
f"{time.time() - start_time} seconds" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.monotonic probably better here?
Description
https://linear.app/danswer/issue/DAN-1400/connector-checkpointing-continue-on-failure
How Has This Been Tested?
Backporting (check the box to trigger backport action)
Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.