Document
beanie.odm.documents
Document
class Document(LazyModel, SettersInterface, InheritanceInterface,
FindInterface, AggregateInterface, OtherGettersInterface)
Document Mapping class.
Fields:
id- MongoDB document ObjectID "_id" field. Mapped to the PydanticObjectId class
Document.get
@classmethod
async def get(cls: type["DocType"],
document_id: Any,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any) -> "DocType | None"
Get document by id, returns None if document does not exist
Arguments:
document_id: PydanticObjectId - document idsession: Optional[AsyncClientSession] - pymongo sessionignore_cache: bool - ignore cache (if it is turned on)**pymongo_kwargs: pymongo native parameters for find operationReturns:
Union["Document", None]
Document.sync
async def sync(merge_strategy: MergeStrategy = MergeStrategy.remote)
Sync the document with the database
Arguments:
merge_strategy: MergeStrategy - how to merge the documentReturns:
None
Document.insert
@validate_self_before
@wrap_with_actions(EventTypes.INSERT)
@save_state_after
async def insert(
*,
link_rule: WriteRules = WriteRules.DO_NOTHING,
session: AsyncClientSession | None = None,
skip_actions: list[ActionDirections | str] | None = None) -> Self
Insert the document (self) to the collection
Arguments:
link_rule: WriteRules - if "WriteRules.WRITE", it will insert Link Documents to db.session: AsyncClientSession - pymongo sessionReturns:
self
Document.create
async def create(session: AsyncClientSession | None = None) -> Self
The same as self.insert()
Returns:
self
Document.insert_one
@classmethod
async def insert_one(
cls: type[DocType],
document: DocType,
session: AsyncClientSession | None = None,
bulk_writer: "BulkWriter | None" = None,
link_rule: WriteRules = WriteRules.DO_NOTHING) -> DocType | None
Insert one document to the collection
Arguments:
document: Document - document to insertsession: AsyncClientSession - pymongo sessionbulk_writer: "BulkWriter" - Beanie bulk writerlink_rule: InsertRules - hot to manage link fieldsReturns:
DocType
Document.insert_many
@classmethod
async def insert_many(cls: type[DocType],
documents: Iterable[DocType],
session: AsyncClientSession | None = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
**pymongo_kwargs: Any) -> InsertManyResult
Insert many documents to the collection
Arguments:
documents: List["Document"] - documents to insertsession: AsyncClientSession - pymongo sessionlink_rule: InsertRules - how to manage link fieldsReturns:
InsertManyResult
Document.replace
@validate_self_before
@wrap_with_actions(EventTypes.REPLACE)
@save_state_after
async def replace(
ignore_revision: bool = False,
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: list[ActionDirections | str] | None = None) -> Self
Fully update the document in the database
Arguments:
session: Optional[AsyncClientSession] - pymongo session.ignore_revision: bool - do force replace. Used when revision based protection is turned on.bulk_writer: "BulkWriter" - Beanie bulk writerReturns:
self
Document.save
@validate_self_before
@wrap_with_actions(EventTypes.SAVE)
@save_state_after
async def save(session: AsyncClientSession | None = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
ignore_revision: bool = False,
**kwargs: Any) -> Self
Update an existing model in the database or
insert it if it does not yet exist.
Arguments:
session: Optional[AsyncClientSession] - pymongo session.link_rule: WriteRules - rules how to deal with links on writingignore_revision: bool - do force save.Returns:
self
Document.save_changes
@saved_state_needed
@validate_self_before
@wrap_with_actions(EventTypes.SAVE_CHANGES)
async def save_changes(
ignore_revision: bool = False,
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
skip_actions: list[ActionDirections | str] | None = None
) -> Self | None
Save changes.
State management usage must be turned on
Arguments:
ignore_revision: bool - ignore revision id, if revision is turned onbulk_writer: "BulkWriter" - Beanie bulk writerReturns:
Optional[self]
Document.replace_many
@classmethod
async def replace_many(cls: type[DocType],
documents: list[DocType],
session: AsyncClientSession | None = None) -> None
Replace list of documents
Arguments:
documents: List["Document"]session: Optional[AsyncClientSession] - pymongo session.Returns:
None
Document.update
@save_state_after
async def update(*args: dict[Any, Any] | Mapping[Any, Any],
ignore_revision: bool = False,
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
skip_actions: list[ActionDirections | str] | None = None,
skip_sync: bool | None = None,
**pymongo_kwargs: Any) -> Self
Partially update the document in the database
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.session: AsyncClientSession - pymongo session.ignore_revision: bool - force update. Will update even if revision id is not the same, as storedbulk_writer: "BulkWriter" - Beanie bulk writerpymongo_kwargs: pymongo native parameters for update operationReturns:
self
Document.update_all
@classmethod
def update_all(cls,
*args: dict | Mapping,
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
**pymongo_kwargs: Any) -> UpdateMany
Partially update all the documents
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.session: AsyncClientSession - pymongo session.bulk_writer: "BulkWriter" - Beanie bulk writer**pymongo_kwargs: pymongo native parameters for find operationReturns:
UpdateMany query
Document.set
def set(expression: dict[ExpressionField | str | Any, Any],
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
skip_sync: bool | None = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Set values
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).set({Sample.one: 100})Uses Set operator
Arguments:
expression: Dict[Union[ExpressionField, str, Any], Any] - keys and values to setsession: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerskip_sync: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.current_date
def current_date(expression: dict[datetime | ExpressionField | str, Any],
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
skip_sync: bool | None = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Set current date
Uses CurrentDate operator
Arguments:
expression: Dict[Union[datetime, ExpressionField, str], Any]session: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerskip_sync: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.inc
def inc(expression: dict[ExpressionField | float | int | str, Any],
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
skip_sync: bool | None = None,
**kwargs: Any) -> Coroutine[None, None, Self]
Increment
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).inc({Sample.one: 100})Uses Inc operator
Arguments:
expression: Dict[Union[ExpressionField, float, int, str], Any]session: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerskip_sync: bool - skip doc syncing. Available for the direct instances onlyReturns:
self
Document.delete
@wrap_with_actions(EventTypes.DELETE)
async def delete(session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
skip_actions: list[ActionDirections | str] | None = None,
**pymongo_kwargs: Any) -> DeleteResult | None
Delete the document
Arguments:
session: Optional[AsyncClientSession] - pymongo session.bulk_writer: "BulkWriter" - Beanie bulk writerlink_rule: DeleteRules - rules for link fields**pymongo_kwargs: pymongo native parameters for delete operationReturns:
Optional[DeleteResult] - pymongo DeleteResult instance.
Document.delete_all
@classmethod
async def delete_all(cls,
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
with_children: bool = False,
**pymongo_kwargs: Any) -> DeleteResult | None
Delete all the documents
Arguments:
session: Optional[AsyncClientSession] - pymongo session.bulk_writer: "BulkWriter" - Beanie bulk writer**pymongo_kwargs: pymongo native parameters for delete operationReturns:
Optional[DeleteResult] - pymongo DeleteResult instance.
Document.use_state_management
@classmethod
def use_state_management(cls) -> bool
Is state management turned on
Returns:
bool
Document.state_management_save_previous
@classmethod
def state_management_save_previous(cls) -> bool
Should we save the previous state after a commit to database
Returns:
bool
Document.state_management_replace_objects
@classmethod
def state_management_replace_objects(cls) -> bool
Should objects be replaced when using state management
Returns:
bool
Document.get_saved_state
def get_saved_state() -> dict[str, Any] | None
Saved state getter. It is protected property.
Returns:
Optional[Dict[str, Any]] - saved state
Document.get_previous_saved_state
def get_previous_saved_state() -> dict[str, Any] | None
Previous state getter. It is a protected property.
Returns:
Optional[Dict[str, Any]] - previous state
Document.get_settings
@classmethod
def get_settings(cls) -> DocumentSettings
Get document settings, which was created on
the initialization step
Returns:
DocumentSettings class
Document.inspect_collection
@classmethod
async def inspect_collection(cls,
session: AsyncClientSession | None = None
) -> InspectionResult
Check, if documents, stored in the MongoDB collection
are compatible with the Document schema
Arguments:
session: Optional[AsyncClientSession] - pymongo session The session instance used for transactional operations. Defaults to None.Returns:
InspectionResult
Document.fetch_link
async def fetch_link(field: str | Any)
Fetch a linked document by field name.
Handles forward Links, BackLinks, and lists of either.
Document.bulk_writer
@classmethod
def bulk_writer(cls,
session: AsyncClientSession | None = None,
ordered: bool = True,
bypass_document_validation: bool | None = False,
comment: Any | None = None) -> BulkWriter
Returns a BulkWriter instance for handling bulk write operations.
Arguments:
session: Optional[AsyncClientSession] - pymongo session. The session instance used for transactional operations.ordered: bool IfTrue(the default), requests will be performed on the server serially, in the order provided. If an error occurs, all remaining operations are aborted. IfFalse, requests will be performed on the server in arbitrary order, possibly in parallel, and all operations will be attempted.bypass_document_validation: bool, optional IfTrue, allows the write to opt-out of document-level validation. Default isFalse.comment: str, optional A user-provided comment to attach to the BulkWriter.Returns:
BulkWriter An instance of BulkWriter configured with the provided settings.
Example Usage: This method is typically used within an asynchronous context manager.
.. code-block:: python
async with Document.bulk_writer(ordered=True) as bulk: await Document.insert_one(Document(field="value"), bulk_writer=bulk)
DocumentWithSoftDelete
class DocumentWithSoftDelete(Document)
Implements soft deletion for Beanie documents.
Instead of permanently removing a document from the database, this subclass marks the document as deleted by setting the optional
deleted_atfield to the current UTC timestamp. Normal query operations automatically exclude such documents, while special query methods can retrieve them when needed.Fields:
deleted_at- marks the document as deleted by setting this field to the current UTC timestamp
DocumentWithSoftDelete.is_deleted
def is_deleted() -> bool
Returns whether the document has been soft-deleted.
Returns:
bool
Trueif the document'sdeleted_atfield is set; otherwiseFalse.
DocumentWithSoftDelete.hard_delete
async def hard_delete(session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
skip_actions: list[ActionDirections | str] | None = None,
**pymongo_kwargs: Any) -> DeleteResult | None
Permanently deletes the document from the database, unlike :meth:
delete.Arguments:
session: Optional[AsyncClientSession] The MongoDB async session used for the operation.bulk_writer: Optional[BulkWriter] A bulk writer used for batching multiple write operations.link_rule: DeleteRules Determines how linked documents are handled. Defaults toDeleteRules.DO_NOTHING.skip_actions: Optional[List[Union[ActionDirections, str]]] A list of lifecycle actions to skip during deletion.**pymongo_kwargs: Any Additional keyword arguments forwarded to PyMongo.Returns:
Optional[DeleteResult] The result of the PyMongo delete operation, if available.
DocumentWithSoftDelete.delete
async def delete(session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
skip_actions: list[ActionDirections | str] | None = None,
**pymongo_kwargs) -> DeleteResult | None
Overrides the base :meth:
delete.Marks the document as deleted by setting the
deleted_attimestamp.The document remains in the database but is excluded from all standard queries such as :meth:
find, :meth:find_one, andArguments:
session: Optional[AsyncClientSession] The MongoDB async session used for the operation.bulk_writer: Optional[BulkWriter] A bulk writer used for batching multiple write operations.link_rule: DeleteRules Determines how linked documents are handled. Defaults toDeleteRules.DO_NOTHING.skip_actions: Optional[List[Union[ActionDirections, str]]] A list of lifecycle actions to skip during deletion.**pymongo_kwargs: Any Additional keyword arguments forwarded to PyMongo.Returns:
None
DocumentWithSoftDelete.find_many_in_all
@classmethod
def find_many_in_all(
cls: type[FindType],
*args: Mapping[Any, Any] | bool,
projection_model: type["DocumentProjectionType"] | None = None,
skip: int | None = None,
limit: int | None = None,
sort: None | str | list[tuple[str, SortDirection]] = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]
Returns a query object including both active and soft-deleted documents.
Returns:
FindManyA query object that includes both deleted and non-deleted documents.
DocumentWithSoftDelete.find_many
@classmethod
def find_many(
cls: type[FindType],
*args: Mapping[Any, Any] | bool,
projection_model: type["DocumentProjectionType"] | None = None,
skip: int | None = None,
limit: int | None = None,
sort: None | str | list[tuple[str, SortDirection]] = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]
Returns a query object that excludes soft-deleted documents.
This overrides the base :meth:
find_manymethod by automatically applying a filter{"deleted_at": None}.Returns:
FindManyA query object for fetching only non-deleted documents.
DocumentWithSoftDelete.find_one
@classmethod
def find_one(
cls: type[FindType],
*args: Mapping[Any, Any] | bool,
projection_model: type["DocumentProjectionType"] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindOne[FindType] | FindOne["DocumentProjectionType"]
Returns a single document that has not been soft-deleted.
This overrides the base :meth:
find_onemethod by automatically applying a filter{"deleted_at": None}.Returns:
FindOneA query object for a single non-deleted document.