Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Writer improving & bulk_writer method for document and possibility to bypass mongo document validation + comment parameter #1079

Merged
merged 26 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 112 additions & 58 deletions beanie/odm/bulk.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any, Dict, List, Mapping, Optional, Type, Union
from __future__ import annotations

from typing import TYPE_CHECKING, Any, List, Optional, Type, Union

from motor.motor_asyncio import AsyncIOMotorClientSession
from pydantic import BaseModel, Field
from pymongo import (
DeleteMany,
DeleteOne,
Expand All @@ -12,82 +13,135 @@
)
from pymongo.results import BulkWriteResult

from beanie.odm.utils.pydantic import IS_PYDANTIC_V2

if IS_PYDANTIC_V2:
from pydantic import ConfigDict


class Operation(BaseModel):
operation: Union[
Type[InsertOne],
Type[DeleteOne],
Type[DeleteMany],
Type[ReplaceOne],
Type[UpdateOne],
Type[UpdateMany],
]
first_query: Mapping[str, Any]
second_query: Optional[Dict[str, Any]] = None
pymongo_kwargs: Dict[str, Any] = Field(default_factory=dict)
object_class: Type

if IS_PYDANTIC_V2:
model_config = ConfigDict(
arbitrary_types_allowed=True,
)
else:
if TYPE_CHECKING:
from beanie import Document
from beanie.odm.union_doc import UnionDoc

class Config:
arbitrary_types_allowed = True
_WriteOp = Union[
InsertOne,
DeleteOne,
DeleteMany,
ReplaceOne,
UpdateOne,
UpdateMany,
]


class BulkWriter:
"""
A utility class for managing and executing bulk operations.

This class facilitates the efficient execution of multiple database operations
(e.g., inserts, updates, deletes, replacements) in a single batch. It supports asynchronous
context management and ensures that all queued operations are committed upon exiting the context.

Attributes:
session Optional[AsyncIOMotorClientSession]:
The motor session used for transactional operations.
Defaults to None, meaning no session is used.
ordered Optional[bool]:
Specifies whether operations are executed sequentially (default) or in parallel.
- If True, operations are performed serially, stopping at the first failure.
- If False, operations may be executed in arbitrary order, and all operations are attempted
regardless of individual failures.
bypass_document_validation Optional[bool]:
If True, document-level validation is bypassed for all operations
in the bulk write. This applies to MongoDB's schema validation rules, allowing documents that
do not meet validation criteria to be inserted or modified. Defaults to False.
comment Optional[Any]:
A user-provided comment attached to the bulk operation command, useful for
auditing and debugging purposes.
operations List[Union[DeleteMany, DeleteOne, InsertOne, ReplaceOne, UpdateMany, UpdateOne]]:
A list of MongoDB operations queued for bulk execution.

Parameters:
session Optional[AsyncIOMotorClientSession]: The motor session for transaction support.
Defaults to None (no session).
ordered Optional[bool]: Specifies whether operations are executed in sequence (True) or in parallel (False).
Defaults to True.
bypass_document_validation Optional[bool]: Allows the bulk operation to bypass document-level validation.
This is particularly useful when working with schemas that are being phased in or for bulk imports
where strict validation may not be necessary. Defaults to False.
comment Optional[Any]: A custom comment attached to the bulk operation.
Defaults to None.
"""

def __init__(
self,
session: Optional[AsyncIOMotorClientSession] = None,
ordered: bool = True,
bypass_document_validation: bool = False,
comment: Optional[Any] = None,
):
self.operations: List[Operation] = []
CAPITAINMARVEL marked this conversation as resolved.
Show resolved Hide resolved
self.operations: List[_WriteOp] = []
self.session = session
self.ordered = ordered
self.object_class: Optional[Type[Union[Document, UnionDoc]]] = None
self.bypass_document_validation = bypass_document_validation
CAPITAINMARVEL marked this conversation as resolved.
Show resolved Hide resolved
self.comment = comment
self._collection_name: str

async def __aenter__(self):
return self
CAPITAINMARVEL marked this conversation as resolved.
Show resolved Hide resolved

async def __aexit__(self, exc_type, exc, tb):
await self.commit()
CAPITAINMARVEL marked this conversation as resolved.
Show resolved Hide resolved
if exc_type is None:
await self.commit()

async def commit(self) -> Optional[BulkWriteResult]:
"""
Commit all the operations to the database
:return:
Commit all queued operations to the database.

Executes all queued operations in a single bulk write request. If there
are no operations to commit, it returns ``None``.

:return: Optional[BulkWriteResult]
The result of the bulk write operation if operations are committed.
Returns ``None`` if there are no operations to execute.
:rtype: Optional[BulkWriteResult]

:raises ValueError:
If the object_class is not specified before committing.
"""
obj_class = None
requests = []
if self.operations:
for op in self.operations:
if obj_class is None:
obj_class = op.object_class

if obj_class != op.object_class:
raise ValueError(
"All the operations should be for a single document model"
)
if op.operation in [InsertOne, DeleteOne]:
query = op.operation(op.first_query, **op.pymongo_kwargs) # type: ignore
else:
query = op.operation(
op.first_query,
op.second_query,
**op.pymongo_kwargs, # type: ignore
)
requests.append(query)

return await obj_class.get_motor_collection().bulk_write( # type: ignore
requests, session=self.session, ordered=self.ordered
if not self.operations:
return None
if not self.object_class:
raise ValueError(
"The document model class must be specified before committing operations."
)
return None
return await self.object_class.get_motor_collection().bulk_write(
self.operations,
ordered=self.ordered,
bypass_document_validation=self.bypass_document_validation,
session=self.session,
comment=self.comment,
)

def add_operation(self, operation: Operation):
def add_operation(
self,
object_class: Type[Union[Document, UnionDoc]],
operation: _WriteOp,
):
"""
Add an operation to the queue.

This method adds a MongoDB operation to the BulkWriter's operation queue.
All operations in the queue must belong to the same collection.

:param object_class: Type[Union[Document, UnionDoc]]
The document model class associated with the operation.
:param operation: Union[DeleteMany, DeleteOne, InsertOne, ReplaceOne, UpdateMany, UpdateOne]
The MongoDB operation to add to the queue.

:raises ValueError:
If the collection differs from the one already associated with the BulkWriter.
"""
if self.object_class is None:
self.object_class = object_class
self._collection_name = object_class.get_collection_name()
else:
if object_class.get_collection_name() != self._collection_name:
raise ValueError(
"All the operations should be for a same collection name"
)
self.operations.append(operation)
50 changes: 43 additions & 7 deletions beanie/odm/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
EventTypes,
wrap_with_actions,
)
from beanie.odm.bulk import BulkWriter, Operation
from beanie.odm.bulk import BulkWriter
from beanie.odm.cache import LRUCache
from beanie.odm.enums import SortDirection
from beanie.odm.fields import (
Expand Down Expand Up @@ -418,15 +418,14 @@ async def insert_one(
"Cascade insert with bulk writing not supported"
)
bulk_writer.add_operation(
Operation(
operation=InsertOne,
first_query=get_dict(
type(document),
InsertOne(
get_dict(
document,
to_db=True,
keep_nulls=document.get_settings().keep_nulls,
),
object_class=type(document),
)
)
),
)
return None

Expand Down Expand Up @@ -1214,6 +1213,43 @@ def link_from_id(cls, id: Any):
ref = DBRef(id=id, collection=cls.get_collection_name())
return Link(ref, document_class=cls)

def bulk_writer(
CAPITAINMARVEL marked this conversation as resolved.
Show resolved Hide resolved
self,
session: Optional[AsyncIOMotorClientSession] = None,
ordered: bool = True,
bypass_document_validation: bool = False,
comment: Optional[Any] = None,
):
CAPITAINMARVEL marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns a BulkWriter instance for handling bulk write operations.

:param session: ClientSession
The session instance used for transactional operations.
:param ordered: bool
If ``True`` (the default), requests will be performed on the server serially, in the order provided. If an error
occurs, all remaining operations are aborted. If ``False``, requests will be performed on the server in
arbitrary order, possibly in parallel, and all operations will be attempted.
:param bypass_document_validation: bool, optional
If ``True``, allows the write to opt-out of document-level validation. Default is ``False``.
:param comment: str, optional
A user-provided comment to attach to the BulkWriter.

:returns: BulkWriter
An instance of BulkWriter configured with the provided settings.

Example Usage:
--------------
This method is typically used within an asynchronous context manager.

.. code-block:: python

async with Document.bulk_writer(ordered=True) as bulk:
await Document.insert_one(Document(field="value"), bulk_writer=bulk)
"""
return BulkWriter(
session, ordered, bypass_document_validation, comment
)


class DocumentWithSoftDelete(Document):
deleted_at: Optional[datetime] = None
Expand Down
4 changes: 2 additions & 2 deletions beanie/odm/interfaces/getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ def get_motor_collection(cls) -> AsyncIOMotorCollection:
return cls.get_settings().motor_collection

@classmethod
def get_collection_name(cls):
return cls.get_settings().name
def get_collection_name(cls) -> str:
return cls.get_settings().name # type: ignore

@classmethod
def get_bson_encoders(cls):
Expand Down
19 changes: 6 additions & 13 deletions beanie/odm/queries/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pymongo import DeleteOne as DeleteOnePyMongo
from pymongo.results import DeleteResult

from beanie.odm.bulk import BulkWriter, Operation
from beanie.odm.bulk import BulkWriter
from beanie.odm.interfaces.clone import CloneInterface
from beanie.odm.interfaces.session import SessionMethods

Expand Down Expand Up @@ -52,12 +52,8 @@ def __await__(
)
else:
self.bulk_writer.add_operation(
Operation(
operation=DeleteManyPyMongo,
first_query=self.find_query,
object_class=self.document_model,
pymongo_kwargs=self.pymongo_kwargs,
)
self.document_model,
DeleteManyPyMongo(self.find_query, **self.pymongo_kwargs),
)
return None

Expand All @@ -82,11 +78,8 @@ def __await__(
)
else:
self.bulk_writer.add_operation(
Operation(
operation=DeleteOnePyMongo,
first_query=self.find_query,
object_class=self.document_model,
pymongo_kwargs=self.pymongo_kwargs,
)
self.document_model,
DeleteOnePyMongo(self.find_query),
**self.pymongo_kwargs,
)
return None
15 changes: 7 additions & 8 deletions beanie/odm/queries/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pymongo.results import UpdateResult

from beanie.exceptions import DocumentNotFound
from beanie.odm.bulk import BulkWriter, Operation
from beanie.odm.bulk import BulkWriter
from beanie.odm.cache import LRUCache
from beanie.odm.enums import SortDirection
from beanie.odm.interfaces.aggregation_methods import AggregateMethods
Expand Down Expand Up @@ -968,18 +968,17 @@ async def replace_one(
return result
else:
bulk_writer.add_operation(
Operation(
operation=ReplaceOne,
first_query=self.get_filter_query(),
second_query=get_dict(
self.document_model,
ReplaceOne(
self.get_filter_query(),
get_dict(
document,
to_db=True,
exclude={"_id"},
keep_nulls=document.get_settings().keep_nulls,
),
object_class=self.document_model,
pymongo_kwargs=self.pymongo_kwargs,
)
**self.pymongo_kwargs,
),
)
return None

Expand Down
Loading
Loading