Document
beanie.odm.documents
Document
class Document(LazyModel, SettersInterface, InheritanceInterface,
FindInterface, AggregateInterface, OtherGettersInterface)
Document Mapping class.
Fields:
id- MongoDB document ObjectID "_id" field. Mapped to the PydanticObjectId class
Document.get
@classmethod
async def get(cls: Type["DocType"],
document_id: Any,
session: Optional[AsyncClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs: Any) -> Optional["DocType"]
Get document by id, returns None if document does not exist
Arguments:
document_id: PydanticObjectId - document idsession: Optional[AsyncClientSession] - pymongo sessionignore_cache: bool - ignore cache (if it is turned on)**pymongo_kwargs: pymongo native parameters for find operationReturns:
Union["Document", None]
Document.sync
async def sync(merge_strategy: MergeStrategy = MergeStrategy.remote)
Sync the document with the database
Arguments:
merge_strategy: MergeStrategy - how to merge the documentReturns:
None
Document.insert
@wrap_with_actions(EventTypes.INSERT)
@save_state_after
@validate_self_before
async def insert(
*,
link_rule: WriteRules = WriteRules.DO_NOTHING,
session: Optional[AsyncClientSession] = None,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None) -> Self
Insert the document (self) to the collection
Arguments:
link_rule: WriteRules - if "WriteRules.WRITE", it will insert Link Documents to db.session: AsyncClientSession - pymongo sessionReturns:
self
Document.create
async def create(session: Optional[AsyncClientSession] = None) -> Self
The same as self.insert()
Returns:
self
Document.insert_one
@classmethod
async def insert_one(
cls: Type[DocType],
document: DocType,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional["BulkWriter"] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING) -> Optional[DocType]
Insert one document to the collection
Arguments:
document: Document - document to insertsession: AsyncClientSession - pymongo sessionbulk_writer: "BulkWriter" - Beanie bulk writerlink_rule: InsertRules - hot to manage link fieldsReturns:
DocType
Document.insert_many
@classmethod
async def insert_many(cls: Type[DocType],
documents: Iterable[DocType],
session: Optional[AsyncClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
**pymongo_kwargs: Any) -> InsertManyResult
Insert many documents to the collection
Arguments:
documents: List["Document"] - documents to insertsession: AsyncClientSession - pymongo sessionlink_rule: InsertRules - how to manage link fieldsReturns:
InsertManyResult
Document.replace
@wrap_with_actions(EventTypes.REPLACE)
@save_state_after
@validate_self_before
async def replace(
ignore_revision: bool = False,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None) -> Self
Fully update the document in the database
Arguments:
session: Optional[AsyncClientSession] - pymongo session.ignore_revision: bool - do force replace. Used when revision based protection is turned on.bulk_writer: "BulkWriter" - Beanie bulk writerReturns:
self
Document.save
@wrap_with_actions(EventTypes.SAVE)
@save_state_after
@validate_self_before
async def save(session: Optional[AsyncClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
ignore_revision: bool = False,
**kwargs: Any) -> Self
Update an existing model in the database or
insert it if it does not yet exist.
Arguments:
session: Optional[AsyncClientSession] - pymongo session.link_rule: WriteRules - rules how to deal with links on writingignore_revision: bool - do force save.Returns:
self
Document.save_changes
@saved_state_needed
@wrap_with_actions(EventTypes.SAVE_CHANGES)
@validate_self_before
async def save_changes(
ignore_revision: bool = False,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None
) -> Optional[Self]
Save changes.
State management usage must be turned on
Arguments:
ignore_revision: bool - ignore revision id, if revision is turned onbulk_writer: "BulkWriter" - Beanie bulk writerReturns:
Optional[self]
Document.replace_many
@classmethod
async def replace_many(cls: Type[DocType],
documents: List[DocType],
session: Optional[AsyncClientSession] = None) -> None
Replace list of documents
Arguments:
documents: List["Document"]session: Optional[AsyncClientSession] - pymongo session.Returns:
None
Document.update
@wrap_with_actions(EventTypes.UPDATE)
@save_state_after
async def update(*args: Union[Dict[Any, Any], Mapping[Any, Any]],
ignore_revision: bool = False,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None,
skip_sync: Optional[bool] = None,
**pymongo_kwargs: Any) -> Self
Partially update the document in the database
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.session: AsyncClientSession - pymongo session.ignore_revision: bool - force update. Will update even if revision id is not the same, as storedbulk_writer: "BulkWriter" - Beanie bulk writerpymongo_kwargs: pymongo native parameters for update operationReturns:
self
Document.update_all
@classmethod
def update_all(cls,
*args: Union[dict, Mapping],
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> UpdateMany
Partially update all the documents
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.session: AsyncClientSession - pymongo session.bulk_writer: "BulkWriter" - Beanie bulk writer**pymongo_kwargs: pymongo native parameters for find operationReturns:
UpdateMany query
Document.set
def set(expression: Dict[Union[ExpressionField, str, Any], Any],
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_sync: Optional[bool] = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Set values
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).set({Sample.one: 100})Uses Set operator
Arguments:
expression: Dict[Union[ExpressionField, str, Any], Any] - keys and values to setsession: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerskip_sync: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.current_date
def current_date(expression: Dict[Union[datetime, ExpressionField, str], Any],
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_sync: Optional[bool] = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Set current date
Uses CurrentDate operator
Arguments:
expression: Dict[Union[datetime, ExpressionField, str], Any]session: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerskip_sync: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.inc
def inc(expression: Dict[Union[ExpressionField, float, int, str], Any],
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_sync: Optional[bool] = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Increment
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).inc({Sample.one: 100})Uses Inc operator
Arguments:
expression: Dict[Union[ExpressionField, float, int, str], Any]session: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerskip_sync: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.delete
@wrap_with_actions(EventTypes.DELETE)
async def delete(session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections,
str]]] = None,
**pymongo_kwargs: Any) -> Optional[DeleteResult]
Delete the document
Arguments:
session: Optional[AsyncClientSession] - pymongo session.bulk_writer: "BulkWriter" - Beanie bulk writerlink_rule: DeleteRules - rules for link fields**pymongo_kwargs: pymongo native parameters for delete operationReturns:
Optional[DeleteResult] - pymongo DeleteResult instance.
Document.delete_all
@classmethod
async def delete_all(cls,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> Optional[DeleteResult]
Delete all the documents
Arguments:
session: Optional[AsyncClientSession] - pymongo session.bulk_writer: "BulkWriter" - Beanie bulk writer**pymongo_kwargs: pymongo native parameters for delete operationReturns:
Optional[DeleteResult] - pymongo DeleteResult instance.
Document.use_state_management
@classmethod
def use_state_management(cls) -> bool
Is state management turned on
Returns:
bool
Document.state_management_save_previous
@classmethod
def state_management_save_previous(cls) -> bool
Should we save the previous state after a commit to database
Returns:
bool
Document.state_management_replace_objects
@classmethod
def state_management_replace_objects(cls) -> bool
Should objects be replaced when using state management
Returns:
bool
Document.get_saved_state
def get_saved_state() -> Optional[Dict[str, Any]]
Saved state getter. It is protected property.
Returns:
Optional[Dict[str, Any]] - saved state
Document.get_previous_saved_state
def get_previous_saved_state() -> Optional[Dict[str, Any]]
Previous state getter. It is a protected property.
Returns:
Optional[Dict[str, Any]] - previous state
Document.get_settings
@classmethod
def get_settings(cls) -> DocumentSettings
Get document settings, which was created on
the initialization step
Returns:
DocumentSettings class
Document.inspect_collection
@classmethod
async def inspect_collection(cls,
session: Optional[AsyncClientSession] = None
) -> InspectionResult
Check, if documents, stored in the MongoDB collection
are compatible with the Document schema
Arguments:
session: Optional[AsyncClientSession] - pymongo session The session instance used for transactional operations. Defaults to None.Returns:
InspectionResult
Document.bulk_writer
@classmethod
def bulk_writer(cls,
session: Optional[AsyncClientSession] = None,
ordered: bool = True,
bypass_document_validation: Optional[bool] = False,
comment: Optional[Any] = None) -> BulkWriter
Returns a BulkWriter instance for handling bulk write operations.
Arguments:
session: Optional[AsyncClientSession] - pymongo session. The session instance used for transactional operations.ordered: bool IfTrue(the default), requests will be performed on the server serially, in the order provided. If an error occurs, all remaining operations are aborted. IfFalse, requests will be performed on the server in arbitrary order, possibly in parallel, and all operations will be attempted.bypass_document_validation: bool, optional IfTrue, allows the write to opt-out of document-level validation. Default isFalse.comment: str, optional A user-provided comment to attach to the BulkWriter.Returns:
BulkWriter An instance of BulkWriter configured with the provided settings.
Example Usage:
This method is typically used within an asynchronous context manager.
.. code-block:: python
async with Document.bulk_writer(ordered=True) as bulk: await Document.insert_one(Document(field="value"), bulk_writer=bulk)