Query
beanie.odm.queries.find
FindQuery
class FindQuery(Generic[FindQueryResultType], UpdateMethods, SessionMethods,
CloneInterface)
Find Query base class
FindQuery.get_filter_query
def get_filter_query() -> Mapping[str, Any]
Returns: MongoDB filter query
FindQuery.delete
def delete(session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> Union[DeleteOne, DeleteMany]
Provide search criteria to the Delete query
Arguments:
session
: Optional[AsyncIOMotorClientSession]Returns:
Union[DeleteOne, DeleteMany]
FindQuery.project
def project(projection_model)
Apply projection parameter
Arguments:
projection_model
: Optional[Type[BaseModel]] - projection modelReturns:
self
FindQuery.count
async def count() -> int
Number of found documents
Returns:
int
FindQuery.exists
async def exists() -> bool
If find query will return anything
Returns:
bool
FindMany
class FindMany(FindQuery[FindQueryResultType],
BaseCursorQuery[FindQueryResultType], AggregateMethods)
Find Many query class
FindMany.find_many
def find_many(
*args: Union[Mapping[str, Any], bool],
projection_model: Optional[Type[FindQueryProjectionType]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
session: Optional[AsyncIOMotorClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs: Any
) -> Union["FindMany[FindQueryResultType]",
"FindMany[FindQueryProjectionType]"]
Find many documents by criteria
Arguments:
args
: *Mapping[str, Any] - search criteriaskip
: Optional[int] - The number of documents to omit.limit
: Optional[int] - The maximum number of results to return.sort
: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.projection_model
: Optional[Type[BaseModel]] - projection modelsession
: Optional[AsyncIOMotorClientSession] - motor sessionignore_cache
: bool**pymongo_kwargs
: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindMany - query instance
FindMany.project
def project(
projection_model: Optional[Type[FindQueryProjectionType]]
) -> Union["FindMany[FindQueryResultType]",
"FindMany[FindQueryProjectionType]"]
Apply projection parameter
Arguments:
projection_model
: Optional[Type[BaseModel]] - projection modelReturns:
self
FindMany.find
def find(
*args: Union[Mapping[str, Any], bool],
projection_model: Optional[Type[FindQueryProjectionType]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
session: Optional[AsyncIOMotorClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs: Any
) -> Union["FindMany[FindQueryResultType]",
"FindMany[FindQueryProjectionType]"]
The same as
find_many(...)
FindMany.sort
def sort(
*args: Optional[Union[str, Tuple[str, SortDirection],
List[Tuple[str, SortDirection]]]]
) -> "FindMany[FindQueryResultType]"
Add sort parameters
Arguments:
args
: Union[str, Tuple[str, SortDirection], List[Tuple[str, SortDirection]]] - A key or a tuple (key, direction) or a list of (key, direction) pairs specifying the sort order for this query.Returns:
self
FindMany.skip
def skip(n: Optional[int]) -> "FindMany[FindQueryResultType]"
Set skip parameter
Arguments:
n
: intReturns:
self
FindMany.limit
def limit(n: Optional[int]) -> "FindMany[FindQueryResultType]"
Set limit parameter
Arguments:
n
: int
FindMany.update
def update(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args
: *Mapping[str,Any] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]bulk_writer
: Optional[BulkWriter]Returns:
UpdateMany query
FindMany.upsert
def upsert(*args: Mapping[str, Any],
on_insert: "DocType",
session: Optional[AsyncIOMotorClientSession] = None,
**pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args
: *Mapping[str,Any] - the modifications to apply.on_insert
: DocType - document to insert if there is no matched document in the collectionsession
: Optional[AsyncIOMotorClientSession]Returns:
UpdateMany query
FindMany.update_many
def update_many(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> UpdateMany
Provide search criteria to the
UpdateMany query
Arguments:
args
: *Mapping[str,Any] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]Returns:
UpdateMany query
FindMany.delete_many
def delete_many(session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> DeleteMany
Provide search criteria to the DeleteMany query
Arguments:
session
:Returns:
DeleteMany query
FindMany.aggregate
def aggregate(
aggregation_pipeline: List[Any],
projection_model: Optional[Type[FindQueryProjectionType]] = None,
session: Optional[AsyncIOMotorClientSession] = None,
ignore_cache: bool = False,
**pymongo_kwargs: Any
) -> Union[
AggregationQuery[Dict[str, Any]],
AggregationQuery[FindQueryProjectionType],
]
Provide search criteria to the AggregationQuery
Arguments:
aggregation_pipeline
: list - aggregation pipeline. MongoDB doc: https://docs.mongodb.com/manual/core/aggregation-pipeline/projection_model
: Type[BaseModel] - Projection Modelsession
: Optional[AsyncIOMotorClientSession] - PyMongo sessionignore_cache
: boolReturns:
FindMany.first_or_none
async def first_or_none() -> Optional[FindQueryResultType]
Returns the first found element or None if no elements were found
FindMany.count
async def count() -> int
Number of found documents
Returns:
int
FindOne
class FindOne(FindQuery[FindQueryResultType])
Find One query class
FindOne.project
def project(
projection_model: Optional[Type[FindQueryProjectionType]] = None
) -> Union["FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"]
Apply projection parameter
Arguments:
projection_model
: Optional[Type[BaseModel]] - projection modelReturns:
self
FindOne.find_one
def find_one(
*args: Union[Mapping[str, Any], bool],
projection_model: Optional[Type[FindQueryProjectionType]] = None,
session: Optional[AsyncIOMotorClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs: Any
) -> Union["FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"]
Find one document by criteria
Arguments:
args
: *Mapping[str, Any] - search criteriaprojection_model
: Optional[Type[BaseModel]] - projection modelsession
: Optional[AsyncIOMotorClientSession] - motor sessionignore_cache
: bool**pymongo_kwargs
: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindOne - query instance
FindOne.update
def update(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
response_type: Optional[UpdateResponse] = None,
**pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args
: *Mapping[str,Any] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]bulk_writer
: Optional[BulkWriter]response_type
: Optional[UpdateResponse]Returns:
UpdateMany query
FindOne.upsert
def upsert(*args: Mapping[str, Any],
on_insert: "DocType",
session: Optional[AsyncIOMotorClientSession] = None,
response_type: Optional[UpdateResponse] = None,
**pymongo_kwargs: Any)
Create Update with modifications query
and provide search criteria there
Arguments:
args
: *Mapping[str,Any] - the modifications to apply.on_insert
: DocType - document to insert if there is no matched document in the collectionsession
: Optional[AsyncIOMotorClientSession]response_type
: Optional[UpdateResponse]Returns:
UpdateMany query
FindOne.update_one
def update_one(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
response_type: Optional[UpdateResponse] = None,
**pymongo_kwargs: Any) -> UpdateOne
Create UpdateOne query using modifications and
provide search criteria there
Arguments:
args
: *Mapping[str,Any] - the modifications to applysession
: Optional[AsyncIOMotorClientSession] - PyMongo sessionsresponse_type
: Optional[UpdateResponse]Returns:
UpdateOne query
FindOne.delete_one
def delete_one(session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> DeleteOne
Provide search criteria to the DeleteOne query
Arguments:
session
: Optional[AsyncIOMotorClientSession] - PyMongo sessionsReturns:
DeleteOne query
FindOne.replace_one
async def replace_one(
document: "DocType",
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None) -> Optional[UpdateResult]
Replace found document by provided
Arguments:
document
: Document - document, which will replace the found onesession
: Optional[AsyncIOMotorClientSession] - PyMongo sessionbulk_writer
: Optional[BulkWriter] - Beanie bulk writerReturns:
UpdateResult
FindOne.__await__
def __await__() -> Generator[Coroutine, Any, Optional[FindQueryResultType]]
Run the query
Returns:
BaseModel
FindOne.count
async def count() -> int
Count the number of documents matching the query
Returns:
int
beanie.odm.queries.delete
DeleteQuery
class DeleteQuery(SessionMethods, CloneInterface)
Deletion Query
DeleteMany
class DeleteMany(DeleteQuery)
DeleteMany.__await__
def __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]
Run the query
DeleteOne
class DeleteOne(DeleteQuery)
DeleteOne.__await__
def __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]
Run the query
beanie.odm.queries.cursor
BaseCursorQuery
class BaseCursorQuery(Generic[CursorResultType])
BaseCursorQuery class. Wrapper over AsyncIOMotorCursor, which parse result with model
BaseCursorQuery.to_list
async def to_list(length: Optional[int] = None) -> List[CursorResultType]
Get list of documents
Arguments:
length
: Optional[int] - length of the listReturns:
Union[List[BaseModel], List[Dict[str, Any]]]
beanie.odm.queries.update
UpdateResponse
class UpdateResponse(str, Enum)
UpdateResponse.UPDATE_RESULT
PyMongo update result
UpdateResponse.OLD_DOCUMENT
Original document
UpdateResponse.NEW_DOCUMENT
Updated document
UpdateQuery
class UpdateQuery(UpdateMethods, SessionMethods, CloneInterface)
Update Query base class
UpdateMany
class UpdateMany(UpdateQuery)
Update Many query class
UpdateMany.update
def update(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the update query.
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]bulk_writer
: Optional[BulkWriter]pymongo_kwargs
: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateMany.upsert
def upsert(*args: Mapping[str, Any],
on_insert: "DocType",
session: Optional[AsyncIOMotorClientSession] = None,
**pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the upsert query.
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.on_insert
: DocType - document to insert if there is no matched document in the collectionsession
: Optional[AsyncIOMotorClientSession]**pymongo_kwargs
: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateMany.update_many
def update_many(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**pymongo_kwargs: Any)
Provide modifications to the update query
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]bulk_writer
: "BulkWriter" - Beanie bulk writerpymongo_kwargs
: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateMany.__await__
def __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult,
Optional["DocType"]]]
Run the query
UpdateOne
class UpdateOne(UpdateQuery)
Update One query class
UpdateOne.update
def update(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
response_type: Optional[UpdateResponse] = None,
**pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the update query.
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]bulk_writer
: Optional[BulkWriter]response_type
: UpdateResponsepymongo_kwargs
: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateOne.upsert
def upsert(*args: Mapping[str, Any],
on_insert: "DocType",
session: Optional[AsyncIOMotorClientSession] = None,
response_type: Optional[UpdateResponse] = None,
**pymongo_kwargs: Any) -> "UpdateQuery"
Provide modifications to the upsert query.
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.on_insert
: DocType - document to insert if there is no matched document in the collectionsession
: Optional[AsyncIOMotorClientSession]response_type
: Optional[UpdateResponse]pymongo_kwargs
: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateOne.update_one
def update_one(*args: Mapping[str, Any],
session: Optional[AsyncIOMotorClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
response_type: Optional[UpdateResponse] = None,
**pymongo_kwargs: Any)
Provide modifications to the update query. The same as
update()
Arguments:
args
: *Union[dict, Mapping] - the modifications to apply.session
: Optional[AsyncIOMotorClientSession]bulk_writer
: "BulkWriter" - Beanie bulk writerresponse_type
: Optional[UpdateResponse]pymongo_kwargs
: pymongo native parameters for update operationReturns:
UpdateMany query
UpdateOne.__await__
def __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult,
Optional["DocType"]]]
Run the query
beanie.odm.queries.aggregation
AggregationQuery
class AggregationQuery(Generic[AggregationProjectionType],
BaseCursorQuery[AggregationProjectionType],
SessionMethods, CloneInterface)
Aggregation Query