Skip to content

Interfaces

beanie.odm.interfaces.find

FindInterface

class FindInterface()

FindInterface.find_one

@classmethod
def find_one(
    cls: Type[FindType],
    *args: Union[Mapping[str, Any], bool],
    projection_model: Optional[Type["DocumentProjectionType"]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    with_children: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs
) -> Union[FindOne[FindType], FindOne["DocumentProjectionType"]]

Find one document by criteria.

Returns FindOne query object. When awaited this will either return a document or None if no document exists for the search criteria.

Arguments:

  • args: *Mapping[str, Any] - search criteria
  • projection_model: Optional[Type[BaseModel]] - projection model
  • session: Optional[AsyncIOMotorClientSession] - motor session instance
  • ignore_cache: bool
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindOne - find query instance

FindInterface.find_many

@classmethod
def find_many(
    cls: Type[FindType],
    *args: Union[Mapping[str, Any], bool],
    projection_model: Optional[Type["DocumentProjectionType"]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]

Find many documents by criteria.

Returns FindMany query object

Arguments:

  • args: *Mapping[str, Any] - search criteria
  • skip: Optional[int] - The number of documents to omit.
  • limit: Optional[int] - The maximum number of results to return.
  • sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.
  • projection_model: Optional[Type[BaseModel]] - projection model
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • ignore_cache: bool
  • lazy_parse: bool
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindMany - query instance

FindInterface.find

@classmethod
def find(
    cls: Type[FindType],
    *args: Union[Mapping[str, Any], bool],
    projection_model: Optional[Type["DocumentProjectionType"]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]

The same as find_many

FindInterface.find_all

@classmethod
def find_all(
    cls: Type[FindType],
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    projection_model: Optional[Type["DocumentProjectionType"]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]

Get all the documents

Arguments:

  • skip: Optional[int] - The number of documents to omit.
  • limit: Optional[int] - The maximum number of results to return.
  • sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.
  • projection_model: Optional[Type[BaseModel]] - projection model
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindMany - query instance

FindInterface.all

@classmethod
def all(
    cls: Type[FindType],
    projection_model: Optional[Type["DocumentProjectionType"]] = None,
    skip: Optional[int] = None,
    limit: Optional[int] = None,
    sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: Optional[int] = None,
    nesting_depths_per_field: Optional[Dict[str, int]] = None,
    **pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]

the same as find_all

FindInterface.count

@classmethod
async def count(cls) -> int

Number of documents in the collections

The same as find_all().count()

Returns:

int

beanie.odm.interfaces.setters

SettersInterface

class SettersInterface()

SettersInterface.set_collection

@classmethod
def set_collection(cls, collection)

Collection setter

SettersInterface.set_database

@classmethod
def set_database(cls, database)

Database setter

SettersInterface.set_collection_name

@classmethod
def set_collection_name(cls, name: str)

Collection name setter

beanie.odm.interfaces.clone

beanie.odm.interfaces.aggregation_methods

AggregateMethods

class AggregateMethods()

Aggregate methods

AggregateMethods.sum

async def sum(field: Union[str, ExpressionField],
              session: Optional[AsyncIOMotorClientSession] = None,
              ignore_cache: bool = False) -> Optional[float]

Sum of values of the given field

Example:

class Sample(Document):
    price: int
    count: int

sum_count = await Document.find(Sample.price <= 100).sum(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • ignore_cache: bool

Returns:

float - sum. None if there are no items.

AggregateMethods.avg

async def avg(field,
              session: Optional[AsyncIOMotorClientSession] = None,
              ignore_cache: bool = False) -> Optional[float]

Average of values of the given field

Example:

class Sample(Document):
    price: int
    count: int

avg_count = await Document.find(Sample.price <= 100).avg(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • ignore_cache: bool

Returns:

Optional[float] - avg. None if there are no items.

AggregateMethods.max

async def max(field: Union[str, ExpressionField],
              session: Optional[AsyncIOMotorClientSession] = None,
              ignore_cache: bool = False) -> Optional[float]

Max of the values of the given field

Example:

class Sample(Document):
    price: int
    count: int

max_count = await Document.find(Sample.price <= 100).max(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[AsyncIOMotorClientSession] - motor session

Returns:

float - max. None if there are no items.

AggregateMethods.min

async def min(field: Union[str, ExpressionField],
              session: Optional[AsyncIOMotorClientSession] = None,
              ignore_cache: bool = False) -> Optional[float]

Min of the values of the given field

Example:

class Sample(Document):
    price: int
    count: int

min_count = await Document.find(Sample.price <= 100).min(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[AsyncIOMotorClientSession] - motor session

Returns:

float - min. None if there are no items.

beanie.odm.interfaces.update

UpdateMethods

class UpdateMethods()

Update methods

UpdateMethods.set

def set(expression: Dict[Union[ExpressionField, str, Any], Any],
        session: Optional[AsyncIOMotorClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None,
        **kwargs)

Set values

Example:

class Sample(Document):
    one: int

await Document.find(Sample.one == 1).set({Sample.one: 100})

Uses Set operator

Arguments:

  • expression: Dict[Union[ExpressionField, str, Any], Any] - keys and values to set
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

UpdateMethods.current_date

def current_date(expression: Dict[Union[datetime, ExpressionField, str], Any],
                 session: Optional[AsyncIOMotorClientSession] = None,
                 bulk_writer: Optional[BulkWriter] = None,
                 **kwargs)

Set current date

Uses CurrentDate operator

Arguments:

  • expression: Dict[Union[datetime, ExpressionField, str], Any]
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

UpdateMethods.inc

def inc(expression: Dict[Union[ExpressionField, float, int, str], Any],
        session: Optional[AsyncIOMotorClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None,
        **kwargs)

Increment

Example:

class Sample(Document):
    one: int

await Document.find(Sample.one == 1).inc({Sample.one: 100})

Uses Inc operator

Arguments:

  • expression: Dict[Union[ExpressionField, float, int, str], Any]
  • session: Optional[AsyncIOMotorClientSession] - motor session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

beanie.odm.interfaces.inheritance

beanie.odm.interfaces.session

SessionMethods

class SessionMethods()

Session methods

SessionMethods.set_session

def set_session(session: Optional[AsyncIOMotorClientSession] = None)

Set motor session

Arguments:

  • session: Optional[AsyncIOMotorClientSession] - motor session

beanie.odm.interfaces.aggregate

AggregateInterface

class AggregateInterface()

AggregateInterface.aggregate

@classmethod
def aggregate(
    cls: Type[DocType],
    aggregation_pipeline: list,
    projection_model: Optional[Type[DocumentProjectionType]] = None,
    session: Optional[AsyncIOMotorClientSession] = None,
    ignore_cache: bool = False,
    **pymongo_kwargs
) -> Union[
        AggregationQuery[Dict[str, Any]],
        AggregationQuery[DocumentProjectionType],
]

Aggregate over collection.

Returns AggregationQuery query object

Arguments:

  • aggregation_pipeline: list - aggregation pipeline
  • projection_model: Type[BaseModel]
  • session: Optional[AsyncIOMotorClientSession]
  • ignore_cache: bool
  • **pymongo_kwargs: pymongo native parameters for aggregate operation

Returns:

AggregationQuery

beanie.odm.interfaces.detector

beanie.odm.interfaces.getters