Document
beanie.odm.documents
Document
class Document(LazyModel, SettersInterface, InheritanceInterface,
FindInterface, AggregateInterface, OtherGettersInterface)
Document Mapping class.
Fields:
id
- MongoDB document ObjectID "_id" field. Mapped to the PydanticObjectId class
Document.get
@classmethod
async def get(cls: Type["DocType"],
document_id: Any,
session: Optional[AsyncIOMotorClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs: Any) -> Optional["DocType"]
Get document by id, returns None if document does not exist
Arguments:
document_id
: PydanticObjectId - document idsession
: Optional[AsyncIOMotorClientSession] - motor sessionignore_cache
: bool - ignore cache (if it is turned on)**pymongo_kwargs
: pymongo native parameters for find operationReturns:
Union["Document", None]
Document.sync
async def sync(merge_strategy: MergeStrategy = MergeStrategy.remote)
Sync the document with the database
Arguments:
merge_strategy
: MergeStrategy - how to merge the documentReturns:
None
Document.insert
@wrap_with_actions(EventTypes.INSERT)
@save_state_after
@validate_self_before
async def insert(
*,
link_rule: WriteRules = WriteRules.DO_NOTHING,
session: Optional[AsyncIOMotorClientSession] = None,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None) -> Self
Insert the document (self) to the collection
Returns:
self
Document.create
async def create(session: Optional[AsyncIOMotorClientSession] = None) -> Self
The same as self.insert()
Returns:
self
Document.insert_one
@classmethod
async def insert_one(
cls: Type[DocType],
document: DocType,
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional["BulkWriter"] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING) -> Optional[DocType]
Insert one document to the collection
Arguments:
document
: Document - document to insertsession
: AsyncIOMotorClientSession - motor sessionbulk_writer
: "BulkWriter" - Beanie bulk writerlink_rule
: InsertRules - hot to manage link fieldsReturns:
DocType
Document.insert_many
@classmethod
async def insert_many(cls: Type[DocType],
documents: Iterable[DocType],
session: Optional[AsyncIOMotorClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
**pymongo_kwargs: Any) -> InsertManyResult
Insert many documents to the collection
Arguments:
documents
: List["Document"] - documents to insertsession
: AsyncIOMotorClientSession - motor sessionlink_rule
: InsertRules - how to manage link fieldsReturns:
InsertManyResult
Document.replace
@wrap_with_actions(EventTypes.REPLACE)
@save_state_after
@validate_self_before
async def replace(
ignore_revision: bool = False,
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None) -> Self
Fully update the document in the database
Arguments:
session
: Optional[AsyncIOMotorClientSession] - motor session.ignore_revision
: bool - do force replace. Used when revision based protection is turned on.bulk_writer
: "BulkWriter" - Beanie bulk writerReturns:
self
Document.save
@wrap_with_actions(EventTypes.SAVE)
@save_state_after
@validate_self_before
async def save(session: Optional[AsyncIOMotorClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
ignore_revision: bool = False,
**kwargs: Any) -> Self
Update an existing model in the database or
insert it if it does not yet exist.
Arguments:
session
: Optional[AsyncIOMotorClientSession] - motor session.link_rule
: WriteRules - rules how to deal with links on writingignore_revision
: bool - do force save.Returns:
self
Document.save_changes
@saved_state_needed
@wrap_with_actions(EventTypes.SAVE_CHANGES)
@validate_self_before
async def save_changes(
ignore_revision: bool = False,
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None
) -> Optional[Self]
Save changes.
State management usage must be turned on
Arguments:
ignore_revision
: bool - ignore revision id, if revision is turned onbulk_writer
: "BulkWriter" - Beanie bulk writerReturns:
Optional[self]
Document.replace_many
@classmethod
async def replace_many(
cls: Type[DocType],
documents: List[DocType],
session: Optional[AsyncIOMotorClientSession] = None) -> None
Replace list of documents
Arguments:
documents
: List["Document"]session
: Optional[AsyncIOMotorClientSession] - motor session.Returns:
None
Document.update
@wrap_with_actions(EventTypes.UPDATE)
@save_state_after
async def update(*args: Union[Dict[Any, Any], Mapping[Any, Any]],
ignore_revision: bool = False,
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None,
skip_sync: Optional[bool] = None,
**pymongo_kwargs: Any) -> Self
Partially update the document in the database
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.session
: AsyncIOMotorClientSession - motor session.ignore_revision
: bool - force update. Will update even if revision id is not the same, as storedbulk_writer
: "BulkWriter" - Beanie bulk writerpymongo_kwargs
: pymongo native parameters for update operationReturns:
self
Document.update_all
@classmethod
def update_all(cls,
*args: Union[dict, Mapping],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> UpdateMany
Partially update all the documents
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.session
: AsyncIOMotorClientSession - motor session.bulk_writer
: "BulkWriter" - Beanie bulk writer**pymongo_kwargs
: pymongo native parameters for find operationReturns:
UpdateMany query
Document.set
def set(expression: Dict[Union[ExpressionField, str, Any], Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_sync: Optional[bool] = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Set values
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).set({Sample.one: 100})
Uses Set operator
Arguments:
expression
: Dict[Union[ExpressionField, str, Any], Any] - keys and values to setsession
: Optional[AsyncIOMotorClientSession] - motor sessionbulk_writer
: Optional[BulkWriter] - bulk writerskip_sync
: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.current_date
def current_date(expression: Dict[Union[datetime, ExpressionField, str], Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_sync: Optional[bool] = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Set current date
Uses CurrentDate operator
Arguments:
expression
: Dict[Union[datetime, ExpressionField, str], Any]session
: Optional[AsyncIOMotorClientSession] - motor sessionbulk_writer
: Optional[BulkWriter] - bulk writerskip_sync
: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.inc
def inc(expression: Dict[Union[ExpressionField, float, int, str], Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_sync: Optional[bool] = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Increment
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).inc({Sample.one: 100})
Uses Inc operator
Arguments:
expression
: Dict[Union[ExpressionField, float, int, str], Any]session
: Optional[AsyncIOMotorClientSession] - motor sessionbulk_writer
: Optional[BulkWriter] - bulk writerskip_sync
: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.delete
@wrap_with_actions(EventTypes.DELETE)
async def delete(session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None,
**pymongo_kwargs: Any) -> Optional[DeleteResult]
Delete the document
Arguments:
session
: Optional[AsyncIOMotorClientSession] - motor session.bulk_writer
: "BulkWriter" - Beanie bulk writerlink_rule
: DeleteRules - rules for link fields**pymongo_kwargs
: pymongo native parameters for delete operationReturns:
Optional[DeleteResult] - pymongo DeleteResult instance.
Document.delete_all
@classmethod
async def delete_all(cls,
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> Optional[DeleteResult]
Delete all the documents
Arguments:
session
: Optional[AsyncIOMotorClientSession] - motor session.bulk_writer
: "BulkWriter" - Beanie bulk writer**pymongo_kwargs
: pymongo native parameters for delete operationReturns:
Optional[DeleteResult] - pymongo DeleteResult instance.
Document.use_state_management
@classmethod
def use_state_management(cls) -> bool
Is state management turned on
Returns:
bool
Document.state_management_save_previous
@classmethod
def state_management_save_previous(cls) -> bool
Should we save the previous state after a commit to database
Returns:
bool
Document.state_management_replace_objects
@classmethod
def state_management_replace_objects(cls) -> bool
Should objects be replaced when using state management
Returns:
bool
Document.get_saved_state
def get_saved_state() -> Optional[Dict[str, Any]]
Saved state getter. It is protected property.
Returns:
Optional[Dict[str, Any]] - saved state
Document.get_previous_saved_state
def get_previous_saved_state() -> Optional[Dict[str, Any]]
Previous state getter. It is a protected property.
Returns:
Optional[Dict[str, Any]] - previous state
Document.get_settings
@classmethod
def get_settings(cls) -> DocumentSettings
Get document settings, which was created on
the initialization step
Returns:
DocumentSettings class
Document.inspect_collection
@classmethod
async def inspect_collection(
cls,
session: Optional[AsyncIOMotorClientSession] = None
) -> InspectionResult
Check, if documents, stored in the MongoDB collection
are compatible with the Document schema
Returns:
InspectionResult
Document.bulk_writer
@classmethod
def bulk_writer(cls,
session: Optional[AsyncIOMotorClientSession] = None,
ordered: bool = True,
bypass_document_validation: bool = False,
comment: Optional[Any] = None) -> BulkWriter
Returns a BulkWriter instance for handling bulk write operations.
Arguments:
session
: ClientSession The session instance used for transactional operations.ordered
: bool IfTrue
(the default), requests will be performed on the server serially, in the order provided. If an error occurs, all remaining operations are aborted. IfFalse
, requests will be performed on the server in arbitrary order, possibly in parallel, and all operations will be attempted.bypass_document_validation
: bool, optional IfTrue
, allows the write to opt-out of document-level validation. Default isFalse
.comment
: str, optional A user-provided comment to attach to the BulkWriter.Returns:
BulkWriter An instance of BulkWriter configured with the provided settings.
Example Usage:
This method is typically used within an asynchronous context manager.
.. code-block:: python
async with Document.bulk_writer(ordered=True) as bulk: await Document.insert_one(Document(field="value"), bulk_writer=bulk)