Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector checkpointing #3876

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

Connector checkpointing #3876

wants to merge 19 commits into from

Conversation

Weves
Copy link
Contributor

@Weves Weves commented Feb 2, 2025

Description

https://linear.app/danswer/issue/DAN-1400/connector-checkpointing-continue-on-failure

How Has This Been Tested?

  • New integration tests
  • Testing Slack connector locally with induced failures + verifying that it indexes the same # of docs as the old connector
  • Tested that Google Drive connector indexes the same # of docs

Backporting (check the box to trigger backport action)

Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.

  • This PR should be backported (make sure to check that the backport attempt succeeds)
  • [Optional] Override Linear Check

Copy link

vercel bot commented Feb 2, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
internal-search ✅ Ready (Inspect) Visit Preview 💬 Add feedback Feb 2, 2025 11:04pm

@Weves Weves marked this pull request as ready for review February 2, 2025 22:01
yield (doc, checkpoint, None)

if thread_ts:
seen_thread_ts.add(thread_ts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe actually still wrong?

@abc.abstractmethod
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment


router = APIRouter(prefix="/manage")


@router.get("/admin/indexing-errors/{index_attempt_id}")
@router.get("/admin/index-attempt/{index_attempt_id}/errors")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: delete this one

window_end: datetime,
) -> ConnectorCheckpoint:
"""Get the latest valid checkpoint for a given connector credential pair"""
checkpoint_candidates = get_recent_completed_attempts_for_cc_pair(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a limit to the number of tries for the same checkpoint

sa.Column("checkpoint_pointer", sa.String(), nullable=True),
)
op.add_column(
"index_attempt",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems extremely specific to the concept of a poll start and end time. Are we sure checkpoints will always be time based?

@shared_task(
name=OnyxCeleryTask.CHECK_FOR_CHECKPOINT_CLEANUP,
soft_time_limit=300,
bind=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the bind used here? maybe we can leave it out?

_: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> PaginatedReturn[IndexAttemptErrorPydantic]:
total_count = count_index_attempt_errors_for_cc_pair(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring for endpoint?


router = APIRouter(prefix="/manage")


@router.get("/admin/indexing-errors/{index_attempt_id}")
@router.get("/admin/index-attempt/{index_attempt_id}/errors")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring?

@@ -519,6 +510,7 @@ def index_doc_batch(
new_docs=len([r for r in insertion_records if r.already_existed is False]),
total_docs=len(filtered_documents),
total_chunks=len(access_aware_chunks),
failures=vector_db_write_failures + embedding_falures,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embedding_falures -> failures

if (failed_document is None and failed_entity is None) or (
failed_document is not None and failed_entity is not None
):
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a Union type then?

id=doc.id, semantic_id=doc.semantic_identifier, section_link=section_link
)

class ConnectorCheckpoint(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put in a check somewhere so that at least we can't write arbitrarily large checkpoints

1. We have more than 3 failures AND
2. Failures account for more than 10% of processed documents
"""
if total_failures > 3 and total_failures / (document_count or 1) > 0.1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could reduce nesting here for readability/flow

@@ -35,6 +35,19 @@
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should put after the cloud generator comment

except Exception as e:
logger.exception(
"Connector run exceptioned after elapsed time: "
f"{time.time() - start_time} seconds"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.monotonic probably better here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants