Query
beanie.odm.queries.aggregation
AggregationQuery
class AggregationQuery(Generic[AggregationProjectionType],
                       BaseCursorQuery[AggregationProjectionType],
                       SessionMethods, CloneInterface)
Aggregation Query
beanie.odm.queries.find
FindQuery
class FindQuery(Generic[FindQueryResultType], UpdateMethods, SessionMethods,
                CloneInterface)
Find Query base class
FindQuery.get_filter_query
def get_filter_query() -> Mapping[str, Any]
Returns: MongoDB filter query
FindQuery.delete
def delete(session: Optional[AsyncClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           **pymongo_kwargs: Any) -> Union[DeleteOne, DeleteMany]
Provide search criteria to the Delete query
Arguments:
session: Optional[AsyncClientSession] - pymongo sessionReturns:
Union[DeleteOne, DeleteMany]
FindQuery.project
def project(projection_model)
Apply projection parameter
Arguments:
projection_model: Optional[Type[BaseModel]] - projection modelReturns:
self
FindQuery.count
async def count() -> int
Number of found documents
Returns:
int
FindQuery.exists
async def exists() -> bool
If find query will return anything
Returns:
bool
FindMany
class FindMany(FindQuery[FindQueryResultType],
               BaseCursorQuery[FindQueryResultType], AggregateMethods)
Find Many query class
FindMany.find_many
def find_many(
    *args: Union[Mapping[Any, Any], bool],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs: Any
) -> Union["FindMany[FindQueryResultType]",
           "FindMany[FindQueryProjectionType]"]
Find many documents by criteria
Arguments:
args: *Mapping[Any, Any] - search criteria
skip: Optional[int] - The number of documents to omit.
limit: Optional[int] - The maximum number of results to return.
sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.
projection_model: Optional[Type[BaseModel]] - projection model
session: Optional[AsyncClientSession] - pymongo session
ignore_cache: bool
**pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindMany - query instance
FindMany.project
def project(
    projection_model: Optional[Type[FindQueryProjectionType]]
) -> Union["FindMany[FindQueryResultType]",
           "FindMany[FindQueryProjectionType]"]
Apply projection parameter
Arguments:
projection_model: Optional[Type[BaseModel]] - projection modelReturns:
self
FindMany.find
def find(
    *args: Union[Mapping[Any, Any], bool],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs: Any
) -> Union["FindMany[FindQueryResultType]",
           "FindMany[FindQueryProjectionType]"]
The same as
find_many(...)
FindMany.sort
def sort(
    *args: Optional[Union[str, Tuple[str, SortDirection],
                          List[Tuple[str, SortDirection]]]]
) -> "FindMany[FindQueryResultType]"
Add sort parameters
Arguments:
args: Union[str, Tuple[str, SortDirection], List[Tuple[str, SortDirection]]] - A key or a tuple (key, direction) or a list of (key, direction) pairs specifying the sort order for this query.Returns:
self
FindMany.skip
def skip(n: Optional[int]) -> "FindMany[FindQueryResultType]"
Set skip parameter
Arguments:
n: intReturns:
self
FindMany.limit
def limit(n: Optional[int]) -> "FindMany[FindQueryResultType]"
Set limit parameter
Arguments:
n: int
FindMany.update
def update(*args: Mapping[str, Any],
           session: Optional[AsyncClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           **pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args: *Mapping[str,Any] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: Optional[BulkWriter]Returns:
UpdateMany query
FindMany.upsert
def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncClientSession] = None,
           **pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args: *Mapping[str,Any] - the modifications to apply.
on_insert: DocType - document to insert if there is no matched document in the collection
session: Optional[AsyncClientSession] - pymongo sessionReturns:
UpdateMany query
FindMany.update_many
def update_many(*args: Mapping[str, Any],
                session: Optional[AsyncClientSession] = None,
                bulk_writer: Optional[BulkWriter] = None,
                **pymongo_kwargs: Any) -> UpdateMany
Provide search criteria to the
UpdateMany query
Arguments:
args: *Mapping[str,Any] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo sessionReturns:
UpdateMany query
FindMany.delete_many
def delete_many(session: Optional[AsyncClientSession] = None,
                bulk_writer: Optional[BulkWriter] = None,
                **pymongo_kwargs: Any) -> DeleteMany
Provide search criteria to the DeleteMany query
Arguments:
session: Optional[AsyncClientSession] - pymongo sessionReturns:
DeleteMany query
FindMany.aggregate
def aggregate(
    aggregation_pipeline: List[Any],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    session: Optional[AsyncClientSession] = None,
    ignore_cache: bool = False,
    **pymongo_kwargs: Any
) -> Union[
        AggregationQuery[Dict[str, Any]],
        AggregationQuery[FindQueryProjectionType],
]
Provide search criteria to the AggregationQuery
Arguments:
aggregation_pipeline: list - aggregation pipeline. MongoDB doc: https://docs.mongodb.com/manual/core/aggregation-pipeline/
projection_model: Type[BaseModel] - Projection Model
session: Optional[AsyncClientSession] - pymongo session
ignore_cache: boolReturns:
FindMany.first_or_none
async def first_or_none() -> Optional[FindQueryResultType]
Returns the first found element or None if no elements were found
FindMany.count
async def count() -> int
Number of found documents
Returns:
int
FindOne
class FindOne(FindQuery[FindQueryResultType])
Find One query class
FindOne.project
def project(
    projection_model: Optional[Type[FindQueryProjectionType]] = None
) -> Union["FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"]
Apply projection parameter
Arguments:
projection_model: Optional[Type[BaseModel]] - projection modelReturns:
self
FindOne.find_one
def find_one(
    *args: Union[Mapping[Any, Any], bool],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    session: Optional[AsyncClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs: Any
) -> Union["FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"]
Find one document by criteria
Arguments:
args: *Mapping[Any, Any] - search criteria
projection_model: Optional[Type[BaseModel]] - projection model
session: Optional[AsyncClientSession] - pymongo session
ignore_cache: bool
**pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindOne - query instance
FindOne.update
def update(*args: Mapping[str, Any],
           session: Optional[AsyncClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args: *Mapping[str,Any] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: Optional[BulkWriter]
response_type: Optional[UpdateResponse]Returns:
UpdateMany query
FindOne.upsert
def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncClientSession] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args: *Mapping[str,Any] - the modifications to apply.
on_insert: DocType - document to insert if there is no matched document in the collection
session: Optional[AsyncClientSession] - pymongo session
response_type: Optional[UpdateResponse]Returns:
UpdateMany query
FindOne.update_one
def update_one(*args: Mapping[str, Any],
               session: Optional[AsyncClientSession] = None,
               bulk_writer: Optional[BulkWriter] = None,
               response_type: Optional[UpdateResponse] = None,
               **pymongo_kwargs: Any) -> UpdateOne
Create UpdateOne query using modifications and
provide search criteria there
Arguments:
args: *Mapping[str,Any] - the modifications to apply
session: Optional[AsyncClientSession] - pymongo session
response_type: Optional[UpdateResponse]Returns:
UpdateOne query
FindOne.delete_one
def delete_one(session: Optional[AsyncClientSession] = None,
               bulk_writer: Optional[BulkWriter] = None,
               **pymongo_kwargs: Any) -> DeleteOne
Provide search criteria to the DeleteOne query
Arguments:
session: Optional[AsyncClientSession] - pymongo sessionReturns:
DeleteOne query
FindOne.replace_one
async def replace_one(
        document: "DocType",
        session: Optional[AsyncClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None) -> Optional[UpdateResult]
Replace found document by provided
Arguments:
document: Document - document, which will replace the found one
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: Optional[BulkWriter] - Beanie bulk writerReturns:
UpdateResult
FindOne.__await__
def __await__() -> Generator[Coroutine, Any, Optional[FindQueryResultType]]
Run the query
Returns:
BaseModel
FindOne.count
async def count() -> int
Count the number of documents matching the query
Returns:
int
beanie.odm.queries.cursor
BaseCursorQuery
class BaseCursorQuery(Generic[CursorResultType])
BaseCursorQuery class. Wrapper over AsyncCursor, which parse result with model
BaseCursorQuery.to_list
async def to_list(length: Optional[int] = None) -> List[CursorResultType]
Get list of documents
Arguments:
length: Optional[int] - length of the listReturns:
Union[List[BaseModel], List[Dict[str, Any]]]
beanie.odm.queries.delete
DeleteQuery
class DeleteQuery(SessionMethods, CloneInterface)
Deletion Query
DeleteMany
class DeleteMany(DeleteQuery)
DeleteMany.__await__
def __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]
Run the query
DeleteOne
class DeleteOne(DeleteQuery)
DeleteOne.__await__
def __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]
Run the query
beanie.odm.queries.update
UpdateResponse
class UpdateResponse(str, Enum)
UpdateResponse.UPDATE_RESULT
PyMongo update result
UpdateResponse.OLD_DOCUMENT
Original document
UpdateResponse.NEW_DOCUMENT
Updated document
UpdateQuery
class UpdateQuery(UpdateMethods, SessionMethods, CloneInterface)
Update Query base class
UpdateMany
class UpdateMany(UpdateQuery)
Update Many query class
UpdateMany.update
def update(*args: Mapping[str, Any],
           session: Optional[AsyncClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the update query.
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: Optional[BulkWriter]
pymongo_kwargs: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateMany.upsert
def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncClientSession] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the upsert query.
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.
on_insert: DocType - document to insert if there is no matched document in the collection
session: Optional[AsyncClientSession] - pymongo session
**pymongo_kwargs: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateMany.update_many
def update_many(*args: Mapping[str, Any],
                session: Optional[AsyncClientSession] = None,
                bulk_writer: Optional[BulkWriter] = None,
                **pymongo_kwargs: Any)
Provide modifications to the update query
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: "BulkWriter" - Beanie bulk writer
pymongo_kwargs: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateMany.__await__
def __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult,
                                              Optional["DocType"]]]
Run the query
UpdateOne
class UpdateOne(UpdateQuery)
Update One query class
UpdateOne.update
def update(*args: Mapping[str, Any],
           session: Optional[AsyncClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the update query.
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: Optional[BulkWriter]
response_type: UpdateResponse
pymongo_kwargs: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateOne.upsert
def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncClientSession] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the upsert query.
Arguments:
args: *Union[dict, Mapping] - the modifications to apply.
on_insert: DocType - document to insert if there is no matched document in the collection
session: Optional[AsyncClientSession] - pymongo session
response_type: Optional[UpdateResponse]
pymongo_kwargs: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateOne.update_one
def update_one(*args: Mapping[str, Any],
               session: Optional[AsyncClientSession] = None,
               bulk_writer: Optional[BulkWriter] = None,
               response_type: Optional[UpdateResponse] = None,
               **pymongo_kwargs: Any)
Provide modifications to the update query. The same as
update()Arguments:
args: *Union[dict, Mapping] - the modifications to apply.
session: Optional[AsyncClientSession] - pymongo session
bulk_writer: "BulkWriter" - Beanie bulk writer
response_type: Optional[UpdateResponse]
pymongo_kwargs: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateOne.__await__
def __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult,
                                              Optional["DocType"]]]
Run the query