Skip to content

Query

beanie.odm.queries.find

FindQuery

class FindQuery(Generic[FindQueryResultType], UpdateMethods, SessionMethods,
                CloneInterface)

Find Query base class

FindQuery.get_filter_query

def get_filter_query() -> Mapping[str, Any]

Returns: MongoDB filter query

FindQuery.delete

def delete(session: Optional[AsyncIOMotorClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           **pymongo_kwargs: Any) -> Union[DeleteOne, DeleteMany]

Provide search criteria to the Delete query

Arguments:

  • session: Optional[AsyncIOMotorClientSession]

Returns:

Union[DeleteOne, DeleteMany]

FindQuery.project

def project(projection_model)

Apply projection parameter

Arguments:

  • projection_model: Optional[Type[BaseModel]] - projection model

Returns:

self

FindQuery.count

async def count() -> int

Number of found documents

Returns:

int

FindQuery.exists

async def exists() -> bool

If find query will return anything

Returns:

bool

FindMany

class FindMany(FindQuery[FindQueryResultType],
               BaseCursorQuery[FindQueryResultType], AggregateMethods)

Find Many query class

FindMany.find_many

def find_many(
    *args: Union[Mapping[str, Any], bool],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs: Any
) -> Union["FindMany[FindQueryResultType]",
           "FindMany[FindQueryProjectionType]"]

Find many documents by criteria

Arguments:

  • args: *Mapping[str, Any] - search criteria
  • skip: Optional[int] - The number of documents to omit.
  • limit: Optional[int] - The maximum number of results to return.
  • sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.
  • projection_model: Optional[Type[BaseModel]] - projection model
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • ignore_cache: bool
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindMany - query instance

FindMany.project

def project(
    projection_model: Optional[Type[FindQueryProjectionType]]
) -> Union["FindMany[FindQueryResultType]",
           "FindMany[FindQueryProjectionType]"]

Apply projection parameter

Arguments:

  • projection_model: Optional[Type[BaseModel]] - projection model

Returns:

self

FindMany.find

def find(
    *args: Union[Mapping[str, Any], bool],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs: Any
) -> Union["FindMany[FindQueryResultType]",
           "FindMany[FindQueryProjectionType]"]

The same as find_many(...)

FindMany.sort

def sort(
    *args: Optional[Union[str, Tuple[str, SortDirection],
                          List[Tuple[str, SortDirection]]]]
) -> "FindMany[FindQueryResultType]"

Add sort parameters

Arguments:

  • args: Union[str, Tuple[str, SortDirection], List[Tuple[str, SortDirection]]] - A key or a tuple (key, direction) or a list of (key, direction) pairs specifying the sort order for this query.

Returns:

self

FindMany.skip

def skip(n: Optional[int]) -> "FindMany[FindQueryResultType]"

Set skip parameter

Arguments:

  • n: int

Returns:

self

FindMany.limit

def limit(n: Optional[int]) -> "FindMany[FindQueryResultType]"

Set limit parameter

Arguments:

  • n: int

FindMany.update

def update(*args: Mapping[str, Any],
           session: Optional[AsyncIOMotorClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           **pymongo_kwargs: Any)

Create Update with modifications query

and provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]
  • bulk_writer: Optional[BulkWriter]

Returns:

UpdateMany query

FindMany.upsert

def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncIOMotorClientSession] = None,
           **pymongo_kwargs: Any)

Create Update with modifications query

and provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply.
  • on_insert: DocType - document to insert if there is no matched document in the collection
  • session: Optional[AsyncIOMotorClientSession]

Returns:

UpdateMany query

FindMany.update_many

def update_many(*args: Mapping[str, Any],
                session: Optional[AsyncIOMotorClientSession] = None,
                bulk_writer: Optional[BulkWriter] = None,
                **pymongo_kwargs: Any) -> UpdateMany

Provide search criteria to the

UpdateMany query

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]

Returns:

UpdateMany query

FindMany.delete_many

def delete_many(session: Optional[AsyncIOMotorClientSession] = None,
                bulk_writer: Optional[BulkWriter] = None,
                **pymongo_kwargs: Any) -> DeleteMany

Provide search criteria to the DeleteMany query

Arguments:

  • session:

Returns:

DeleteMany query

FindMany.aggregate

def aggregate(
    aggregation_pipeline: List[Any],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    **pymongo_kwargs: Any
) -> Union[
        AggregationQuery[Dict[str, Any]],
        AggregationQuery[FindQueryProjectionType],
]

Provide search criteria to the AggregationQuery

Arguments:

Returns:

AggregationQuery

FindMany.first_or_none

async def first_or_none() -> Optional[FindQueryResultType]

Returns the first found element or None if no elements were found

FindMany.count

async def count() -> int

Number of found documents

Returns:

int

FindOne

class FindOne(FindQuery[FindQueryResultType])

Find One query class

FindOne.project

def project(
    projection_model: Optional[Type[FindQueryProjectionType]] = None
) -> Union["FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"]

Apply projection parameter

Arguments:

  • projection_model: Optional[Type[BaseModel]] - projection model

Returns:

self

FindOne.find_one

def find_one(
    *args: Union[Mapping[str, Any], bool],
    projection_model: Optional[Type[FindQueryProjectionType]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs: Any
) -> Union["FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"]

Find one document by criteria

Arguments:

  • args: *Mapping[str, Any] - search criteria
  • projection_model: Optional[Type[BaseModel]] - projection model
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • ignore_cache: bool
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindOne - query instance

FindOne.update

def update(*args: Mapping[str, Any],
           session: Optional[AsyncIOMotorClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any)

Create Update with modifications query

and provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]
  • bulk_writer: Optional[BulkWriter]
  • response_type: Optional[UpdateResponse]

Returns:

UpdateMany query

FindOne.upsert

def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncIOMotorClientSession] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any)

Create Update with modifications query

and provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply.
  • on_insert: DocType - document to insert if there is no matched document in the collection
  • session: Optional[AsyncIOMotorClientSession]
  • response_type: Optional[UpdateResponse]

Returns:

UpdateMany query

FindOne.update_one

def update_one(*args: Mapping[str, Any],
               session: Optional[AsyncIOMotorClientSession] = None,
               bulk_writer: Optional[BulkWriter] = None,
               response_type: Optional[UpdateResponse] = None,
               **pymongo_kwargs: Any) -> UpdateOne

Create UpdateOne query using modifications and

provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply
  • session: Optional[AsyncIOMotorClientSession] - PyMongo sessions
  • response_type: Optional[UpdateResponse]

Returns:

UpdateOne query

FindOne.delete_one

def delete_one(session: Optional[AsyncIOMotorClientSession] = None,
               bulk_writer: Optional[BulkWriter] = None,
               **pymongo_kwargs: Any) -> DeleteOne

Provide search criteria to the DeleteOne query

Arguments:

  • session: Optional[AsyncIOMotorClientSession] - PyMongo sessions

Returns:

DeleteOne query

FindOne.replace_one

async def replace_one(
        document: "DocType",
        session: Optional[AsyncIOMotorClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None) -> Optional[UpdateResult]

Replace found document by provided

Arguments:

  • document: Document - document, which will replace the found one
  • session: Optional[AsyncIOMotorClientSession] - PyMongo session
  • bulk_writer: Optional[BulkWriter] - Beanie bulk writer

Returns:

UpdateResult

FindOne.__await__

def __await__() -> Generator[Coroutine, Any, Optional[FindQueryResultType]]

Run the query

Returns:

BaseModel

FindOne.count

async def count() -> int

Count the number of documents matching the query

Returns:

int

beanie.odm.queries.delete

DeleteQuery

class DeleteQuery(SessionMethods, CloneInterface)

Deletion Query

DeleteMany

class DeleteMany(DeleteQuery)

DeleteMany.__await__

def __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]

Run the query

DeleteOne

class DeleteOne(DeleteQuery)

DeleteOne.__await__

def __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]

Run the query

beanie.odm.queries.cursor

BaseCursorQuery

class BaseCursorQuery(Generic[CursorResultType])

BaseCursorQuery class. Wrapper over AsyncIOMotorCursor, which parse result with model

BaseCursorQuery.to_list

async def to_list(length: Optional[int] = None) -> List[CursorResultType]

Get list of documents

Arguments:

  • length: Optional[int] - length of the list

Returns:

Union[List[BaseModel], List[Dict[str, Any]]]

beanie.odm.queries.update

UpdateResponse

class UpdateResponse(str, Enum)

UpdateResponse.UPDATE_RESULT

PyMongo update result

UpdateResponse.OLD_DOCUMENT

Original document

UpdateResponse.NEW_DOCUMENT

Updated document

UpdateQuery

class UpdateQuery(UpdateMethods, SessionMethods, CloneInterface)

Update Query base class

UpdateMany

class UpdateMany(UpdateQuery)

Update Many query class

UpdateMany.update

def update(*args: Mapping[str, Any],
           session: Optional[AsyncIOMotorClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"

Provide modifications to the update query.

Arguments:

  • args: *Union[dict, Mapping] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]
  • bulk_writer: Optional[BulkWriter]
  • pymongo_kwargs: pymongo native parameters for update operation

Returns:

UpdateMany query

UpdateMany.upsert

def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncIOMotorClientSession] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"

Provide modifications to the upsert query.

Arguments:

  • args: *Union[dict, Mapping] - the modifications to apply.
  • on_insert: DocType - document to insert if there is no matched document in the collection
  • session: Optional[AsyncIOMotorClientSession]
  • **pymongo_kwargs: pymongo native parameters for update operation

Returns:

UpdateMany query

UpdateMany.update_many

def update_many(*args: Mapping[str, Any],
                session: Optional[AsyncIOMotorClientSession] = None,
                bulk_writer: Optional[BulkWriter] = None,
                **pymongo_kwargs: Any)

Provide modifications to the update query

Arguments:

  • args: *Union[dict, Mapping] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]
  • bulk_writer: "BulkWriter" - Beanie bulk writer
  • pymongo_kwargs: pymongo native parameters for update operation

Returns:

UpdateMany query

UpdateMany.__await__

def __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult,
                                              Optional["DocType"]]]

Run the query

UpdateOne

class UpdateOne(UpdateQuery)

Update One query class

UpdateOne.update

def update(*args: Mapping[str, Any],
           session: Optional[AsyncIOMotorClientSession] = None,
           bulk_writer: Optional[BulkWriter] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"

Provide modifications to the update query.

Arguments:

  • args: *Union[dict, Mapping] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]
  • bulk_writer: Optional[BulkWriter]
  • response_type: UpdateResponse
  • pymongo_kwargs: pymongo native parameters for update operation

Returns:

UpdateMany query

UpdateOne.upsert

def upsert(*args: Mapping[str, Any],
           on_insert: "DocType",
           session: Optional[AsyncIOMotorClientSession] = None,
           response_type: Optional[UpdateResponse] = None,
           **pymongo_kwargs: Any) -> "UpdateQuery"

Provide modifications to the upsert query.

Arguments:

  • args: *Union[dict, Mapping] - the modifications to apply.
  • on_insert: DocType - document to insert if there is no matched document in the collection
  • session: Optional[AsyncIOMotorClientSession]
  • response_type: Optional[UpdateResponse]
  • pymongo_kwargs: pymongo native parameters for update operation

Returns:

UpdateMany query

UpdateOne.update_one

def update_one(*args: Mapping[str, Any],
               session: Optional[AsyncIOMotorClientSession] = None,
               bulk_writer: Optional[BulkWriter] = None,
               response_type: Optional[UpdateResponse] = None,
               **pymongo_kwargs: Any)

Provide modifications to the update query. The same as update()

Arguments:

  • args: *Union[dict, Mapping] - the modifications to apply.
  • session: Optional[AsyncIOMotorClientSession]
  • bulk_writer: "BulkWriter" - Beanie bulk writer
  • response_type: Optional[UpdateResponse]
  • pymongo_kwargs: pymongo native parameters for update operation

Returns:

UpdateMany query

UpdateOne.__await__

def __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult,
                                              Optional["DocType"]]]

Run the query

beanie.odm.queries.aggregation

AggregationQuery

class AggregationQuery(Generic[AggregationProjectionType],
                       BaseCursorQuery[AggregationProjectionType],
                       SessionMethods, CloneInterface)

Aggregation Query