Skip to content

Interfaces

beanie.odm.interfaces.session

SessionMethods

class SessionMethods()

Session methods

SessionMethods.set_session

def set_session(session: AsyncClientSession | None = None)

Set session

Arguments:

  • session: Optional[AsyncClientSession] - pymongo session

beanie.odm.interfaces.getters

beanie.odm.interfaces.aggregation_methods

AggregateMethods

class AggregateMethods()

Aggregate methods

AggregateMethods.sum

async def sum(field: ExpressionField | float | int | str,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> float | None

Sum of values of the given field

Example:

class Sample(Document):
    price: int
    count: int

sum_count = await Document.find(Sample.price <= 100).sum(Sample.count)

Arguments:

  • field: Union[ExpressionField, float, int, str]
  • session: Optional[AsyncClientSession] - pymongo session
  • ignore_cache: bool

Returns:

float - sum. None if there are no items.

AggregateMethods.avg

async def avg(field: ExpressionField | float | int | str,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> float | None

Average of values of the given field

Example:

class Sample(Document):
    price: int
    count: int

avg_count = await Document.find(Sample.price <= 100).avg(Sample.count)

Arguments:

  • field: Union[ExpressionField, float, int, str]
  • session: Optional[AsyncClientSession] - pymongo session
  • ignore_cache: bool

Returns:

Optional[float] - avg. None if there are no items.

AggregateMethods.max

async def max(field: ExpressionField | str | Any,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> Any | None

Max of the values of the given field

Example:

class Sample(Document):
    price: int
    count: int

max_count = await Document.find(Sample.price <= 100).max(Sample.count)

Arguments:

  • field: Union[ExpressionField, str, Any]
  • session: Optional[AsyncClientSession] - pymongo session

Returns:

Any - max value. None if there are no items.

AggregateMethods.min

async def min(field: ExpressionField | str | Any,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> Any | None

Min of the values of the given field

Example:

class Sample(Document):
    price: int
    count: int

min_count = await Document.find(Sample.price <= 100).min(Sample.count)

Arguments:

  • field: Union[ExpressionField, str, Any]
  • session: Optional[AsyncClientSession] - pymongo session

Returns:

Any - min value. None if there are no items.

beanie.odm.interfaces.update

UpdateMethods

class UpdateMethods()

Update methods

UpdateMethods.set

def set(expression: dict[ExpressionField | str | Any, Any],
        session: AsyncClientSession | None = None,
        bulk_writer: BulkWriter | None = None,
        **kwargs: Any)

Set values

Example:

class Sample(Document):
    one: int

await Document.find(Sample.one == 1).set({Sample.one: 100})

Uses Set operator

Arguments:

  • expression: Dict[Union[ExpressionField, str, Any], Any] - keys and values to set
  • session: Optional[AsyncClientSession] - pymongo session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

UpdateMethods.current_date

def current_date(expression: dict[datetime | ExpressionField | str, Any],
                 session: AsyncClientSession | None = None,
                 bulk_writer: BulkWriter | None = None,
                 **kwargs: Any)

Set current date

Uses CurrentDate operator

Arguments:

  • expression: Dict[Union[datetime, ExpressionField, str], Any]
  • session: Optional[AsyncClientSession] - pymongo session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

UpdateMethods.inc

def inc(expression: dict[ExpressionField | float | int | str, Any],
        session: AsyncClientSession | None = None,
        bulk_writer: BulkWriter | None = None,
        **kwargs: Any)

Increment

Example:

class Sample(Document):
    one: int

await Document.find(Sample.one == 1).inc({Sample.one: 100})

Uses Inc operator

Arguments:

  • expression: Dict[Union[ExpressionField, float, int, str], Any]
  • session: Optional[AsyncClientSession] - pymongo session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

beanie.odm.interfaces.find

FindInterface

class FindInterface()

FindInterface.find_one

@classmethod
def find_one(
    cls: type[FindType],
    *args: Mapping[Any, Any] | bool,
    projection_model: type["DocumentProjectionType"] | None = None,
    session: AsyncClientSession | None = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    with_children: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: dict[str, int] | None = None,
    **pymongo_kwargs: Any
) -> FindOne[FindType] | FindOne["DocumentProjectionType"]

Find one document by criteria.

Returns FindOne query object. When awaited this will either return a document or None if no document exists for the search criteria.

Arguments:

  • args: *Mapping[Any, Any] - search criteria
  • projection_model: Optional[type[BaseModel]] - projection model
  • session: Optional[AsyncClientSession] - pymongo session.
  • ignore_cache: bool
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindOne - find query instance

FindInterface.find_many

@classmethod
def find_many(
    cls: type[FindType],
    *args: Mapping[Any, Any] | bool,
    projection_model: type["DocumentProjectionType"] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    sort: str | list[tuple[str, SortDirection]] | None = None,
    session: AsyncClientSession | None = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: dict[str, int] | None = None,
    **pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]

Find many documents by criteria.

Returns FindMany query object

Arguments:

  • args: *Mapping[Any, Any] - search criteria
  • skip: Optional[int] - The number of documents to omit.
  • limit: Optional[int] - The maximum number of results to return.
  • sort: Union[None, str, list[tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.
  • projection_model: Optional[type[BaseModel]] - projection model
  • session: Optional[AsyncClientSession] - pymongo session.
  • ignore_cache: bool
  • lazy_parse: bool
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindMany - query instance

FindInterface.find

@classmethod
def find(
    cls: type[FindType],
    *args: Mapping[Any, Any] | bool,
    projection_model: type["DocumentProjectionType"] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    sort: str | list[tuple[str, SortDirection]] | None = None,
    session: AsyncClientSession | None = None,
    ignore_cache: bool = False,
    fetch_links: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: dict[str, int] | None = None,
    **pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]

The same as find_many

FindInterface.find_all

@classmethod
def find_all(
    cls: type[FindType],
    skip: int | None = None,
    limit: int | None = None,
    sort: str | list[tuple[str, SortDirection]] | None = None,
    projection_model: type["DocumentProjectionType"] | None = None,
    session: AsyncClientSession | None = None,
    ignore_cache: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: dict[str, int] | None = None,
    **pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]

Get all the documents

Arguments:

  • skip: Optional[int] - The number of documents to omit.
  • limit: Optional[int] - The maximum number of results to return.
  • sort: Union[None, str, list[tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.
  • projection_model: Optional[type[BaseModel]] - projection model
  • session: Optional[AsyncClientSession] - pymongo session.
  • **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)

Returns:

FindMany - query instance

FindInterface.all

@classmethod
def all(
    cls: type[FindType],
    projection_model: type["DocumentProjectionType"] | None = None,
    skip: int | None = None,
    limit: int | None = None,
    sort: str | list[tuple[str, SortDirection]] | None = None,
    session: AsyncClientSession | None = None,
    ignore_cache: bool = False,
    with_children: bool = False,
    lazy_parse: bool = False,
    nesting_depth: int | None = None,
    nesting_depths_per_field: dict[str, int] | None = None,
    **pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]

the same as find_all

FindInterface.count

@classmethod
async def count(cls) -> int

Number of documents in the collections

The same as find_all().count()

Returns:

int

beanie.odm.interfaces.inheritance

beanie.odm.interfaces.aggregate

AggregateInterface

class AggregateInterface()

AggregateInterface.aggregate

@classmethod
def aggregate(
    cls: type[DocType],
    aggregation_pipeline: list,
    projection_model: type[DocumentProjectionType] | None = None,
    session: AsyncClientSession | None = None,
    ignore_cache: bool = False,
    **pymongo_kwargs: Any
) -> (AggregationQuery[dict[str, Any]]
      | AggregationQuery[DocumentProjectionType])

Aggregate over collection.

Returns AggregationQuery query object

Arguments:

  • aggregation_pipeline: list - aggregation pipeline
  • projection_model: Type[BaseModel]
  • session: Optional[AsyncClientSession] - pymongo session.
  • ignore_cache: bool
  • **pymongo_kwargs: pymongo native parameters for aggregate operation

Returns:

AggregationQuery

AggregateInterface.sum

@classmethod
async def sum(cls,
              field: ExpressionField | float | int | str,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> float | None

Sum of values of the given field over the entire collection.

Example:

class Sample(Document):
    price: int

sum_count = await Document.sum(Sample.price)

Arguments:

  • field: Union[ExpressionField, float, int, str]
  • session: Optional[AsyncClientSession] - pymongo session
  • ignore_cache: bool

Returns:

float - sum. None if there are no items.

AggregateInterface.avg

@classmethod
async def avg(cls,
              field: ExpressionField | float | int | str,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> float | None

Average of values of the given field over the entire collection.

Example:

class Sample(Document):
    price: int

avg_count = await Document.avg(Sample.price)

Arguments:

  • field: Union[ExpressionField, float, int, str]
  • session: Optional[AsyncClientSession] - pymongo session
  • ignore_cache: bool

Returns:

Optional[float] - avg. None if there are no items.

AggregateInterface.max

@classmethod
async def max(cls,
              field: ExpressionField | str | Any,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> Any | None

Max of the values of the given field over the entire collection.

Example:

class Sample(Document):
    price: int

max_count = await Document.max(Sample.price)

Arguments:

  • field: Union[ExpressionField, str, Any]
  • session: Optional[AsyncClientSession] - pymongo session

Returns:

Any - max value. None if there are no items.

AggregateInterface.min

@classmethod
async def min(cls,
              field: ExpressionField | str | Any,
              session: AsyncClientSession | None = None,
              ignore_cache: bool = False) -> Any | None

Min of the values of the given field over the entire collection.

Example:

class Sample(Document):
    price: int

min_count = await Document.min(Sample.price)

Arguments:

  • field: Union[ExpressionField, str, Any]
  • session: Optional[AsyncClientSession] - pymongo session

Returns:

Any - min value. None if there are no items.

beanie.odm.interfaces.setters

SettersInterface

class SettersInterface()

SettersInterface.set_collection

@classmethod
def set_collection(cls, collection)

Collection setter

SettersInterface.set_database

@classmethod
def set_database(cls, database)

Database setter

SettersInterface.set_collection_name

@classmethod
def set_collection_name(cls, name: str)

Collection name setter

beanie.odm.interfaces.clone

beanie.odm.interfaces.detector