Interfaces
beanie.odm.interfaces.session
SessionMethods
class SessionMethods()
Session methods
SessionMethods.set_session
def set_session(session: AsyncClientSession | None = None)
Set session
Arguments:
session: Optional[AsyncClientSession] - pymongo session
beanie.odm.interfaces.getters
beanie.odm.interfaces.aggregation_methods
AggregateMethods
class AggregateMethods()
Aggregate methods
AggregateMethods.sum
async def sum(field: ExpressionField | float | int | str,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> float | None
Sum of values of the given field
Example:
class Sample(Document): price: int count: int sum_count = await Document.find(Sample.price <= 100).sum(Sample.count)Arguments:
field: Union[ExpressionField, float, int, str]session: Optional[AsyncClientSession] - pymongo sessionignore_cache: boolReturns:
float - sum. None if there are no items.
AggregateMethods.avg
async def avg(field: ExpressionField | float | int | str,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> float | None
Average of values of the given field
Example:
class Sample(Document): price: int count: int avg_count = await Document.find(Sample.price <= 100).avg(Sample.count)Arguments:
field: Union[ExpressionField, float, int, str]session: Optional[AsyncClientSession] - pymongo sessionignore_cache: boolReturns:
Optional[float] - avg. None if there are no items.
AggregateMethods.max
async def max(field: ExpressionField | str | Any,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> Any | None
Max of the values of the given field
Example:
class Sample(Document): price: int count: int max_count = await Document.find(Sample.price <= 100).max(Sample.count)Arguments:
field: Union[ExpressionField, str, Any]session: Optional[AsyncClientSession] - pymongo sessionReturns:
Any - max value. None if there are no items.
AggregateMethods.min
async def min(field: ExpressionField | str | Any,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> Any | None
Min of the values of the given field
Example:
class Sample(Document): price: int count: int min_count = await Document.find(Sample.price <= 100).min(Sample.count)Arguments:
field: Union[ExpressionField, str, Any]session: Optional[AsyncClientSession] - pymongo sessionReturns:
Any - min value. None if there are no items.
beanie.odm.interfaces.update
UpdateMethods
class UpdateMethods()
Update methods
UpdateMethods.set
def set(expression: dict[ExpressionField | str | Any, Any],
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
**kwargs: Any)
Set values
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).set({Sample.one: 100})Uses Set operator
Arguments:
expression: Dict[Union[ExpressionField, str, Any], Any] - keys and values to setsession: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerReturns:
self
UpdateMethods.current_date
def current_date(expression: dict[datetime | ExpressionField | str, Any],
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
**kwargs: Any)
Set current date
Uses CurrentDate operator
Arguments:
expression: Dict[Union[datetime, ExpressionField, str], Any]session: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerReturns:
self
UpdateMethods.inc
def inc(expression: dict[ExpressionField | float | int | str, Any],
session: AsyncClientSession | None = None,
bulk_writer: BulkWriter | None = None,
**kwargs: Any)
Increment
Example:
class Sample(Document): one: int await Document.find(Sample.one == 1).inc({Sample.one: 100})Uses Inc operator
Arguments:
expression: Dict[Union[ExpressionField, float, int, str], Any]session: Optional[AsyncClientSession] - pymongo sessionbulk_writer: Optional[BulkWriter] - bulk writerReturns:
self
beanie.odm.interfaces.find
FindInterface
class FindInterface()
FindInterface.find_one
@classmethod
def find_one(
cls: type[FindType],
*args: Mapping[Any, Any] | bool,
projection_model: type["DocumentProjectionType"] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindOne[FindType] | FindOne["DocumentProjectionType"]
Find one document by criteria.
Returns FindOne query object. When awaited this will either return a document or None if no document exists for the search criteria.
Arguments:
args: *Mapping[Any, Any] - search criteriaprojection_model: Optional[type[BaseModel]] - projection modelsession: Optional[AsyncClientSession] - pymongo session.ignore_cache: bool**pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindOne - find query instance
FindInterface.find_many
@classmethod
def find_many(
cls: type[FindType],
*args: Mapping[Any, Any] | bool,
projection_model: type["DocumentProjectionType"] | None = None,
skip: int | None = None,
limit: int | None = None,
sort: str | list[tuple[str, SortDirection]] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]
Find many documents by criteria.
Returns FindMany query object
Arguments:
args: *Mapping[Any, Any] - search criteriaskip: Optional[int] - The number of documents to omit.limit: Optional[int] - The maximum number of results to return.sort: Union[None, str, list[tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.projection_model: Optional[type[BaseModel]] - projection modelsession: Optional[AsyncClientSession] - pymongo session.ignore_cache: boollazy_parse: bool**pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindMany - query instance
FindInterface.find
@classmethod
def find(
cls: type[FindType],
*args: Mapping[Any, Any] | bool,
projection_model: type["DocumentProjectionType"] | None = None,
skip: int | None = None,
limit: int | None = None,
sort: str | list[tuple[str, SortDirection]] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]
The same as find_many
FindInterface.find_all
@classmethod
def find_all(
cls: type[FindType],
skip: int | None = None,
limit: int | None = None,
sort: str | list[tuple[str, SortDirection]] | None = None,
projection_model: type["DocumentProjectionType"] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]
Get all the documents
Arguments:
skip: Optional[int] - The number of documents to omit.limit: Optional[int] - The maximum number of results to return.sort: Union[None, str, list[tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.projection_model: Optional[type[BaseModel]] - projection modelsession: Optional[AsyncClientSession] - pymongo session.**pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindMany - query instance
FindInterface.all
@classmethod
def all(
cls: type[FindType],
projection_model: type["DocumentProjectionType"] | None = None,
skip: int | None = None,
limit: int | None = None,
sort: str | list[tuple[str, SortDirection]] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: int | None = None,
nesting_depths_per_field: dict[str, int] | None = None,
**pymongo_kwargs: Any
) -> FindMany[FindType] | FindMany["DocumentProjectionType"]
the same as find_all
FindInterface.count
@classmethod
async def count(cls) -> int
Number of documents in the collections
The same as find_all().count()
Returns:
int
beanie.odm.interfaces.inheritance
beanie.odm.interfaces.aggregate
AggregateInterface
class AggregateInterface()
AggregateInterface.aggregate
@classmethod
def aggregate(
cls: type[DocType],
aggregation_pipeline: list,
projection_model: type[DocumentProjectionType] | None = None,
session: AsyncClientSession | None = None,
ignore_cache: bool = False,
**pymongo_kwargs: Any
) -> (AggregationQuery[dict[str, Any]]
| AggregationQuery[DocumentProjectionType])
Aggregate over collection.
Returns AggregationQuery query object
Arguments:
aggregation_pipeline: list - aggregation pipelineprojection_model: Type[BaseModel]session: Optional[AsyncClientSession] - pymongo session.ignore_cache: bool**pymongo_kwargs: pymongo native parameters for aggregate operationReturns:
AggregateInterface.sum
@classmethod
async def sum(cls,
field: ExpressionField | float | int | str,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> float | None
Sum of values of the given field over the entire collection.
Example:
class Sample(Document): price: int sum_count = await Document.sum(Sample.price)Arguments:
field: Union[ExpressionField, float, int, str]session: Optional[AsyncClientSession] - pymongo sessionignore_cache: boolReturns:
float - sum. None if there are no items.
AggregateInterface.avg
@classmethod
async def avg(cls,
field: ExpressionField | float | int | str,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> float | None
Average of values of the given field over the entire collection.
Example:
class Sample(Document): price: int avg_count = await Document.avg(Sample.price)Arguments:
field: Union[ExpressionField, float, int, str]session: Optional[AsyncClientSession] - pymongo sessionignore_cache: boolReturns:
Optional[float] - avg. None if there are no items.
AggregateInterface.max
@classmethod
async def max(cls,
field: ExpressionField | str | Any,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> Any | None
Max of the values of the given field over the entire collection.
Example:
class Sample(Document): price: int max_count = await Document.max(Sample.price)Arguments:
field: Union[ExpressionField, str, Any]session: Optional[AsyncClientSession] - pymongo sessionReturns:
Any - max value. None if there are no items.
AggregateInterface.min
@classmethod
async def min(cls,
field: ExpressionField | str | Any,
session: AsyncClientSession | None = None,
ignore_cache: bool = False) -> Any | None
Min of the values of the given field over the entire collection.
Example:
class Sample(Document): price: int min_count = await Document.min(Sample.price)Arguments:
field: Union[ExpressionField, str, Any]session: Optional[AsyncClientSession] - pymongo sessionReturns:
Any - min value. None if there are no items.
beanie.odm.interfaces.setters
SettersInterface
class SettersInterface()
SettersInterface.set_collection
@classmethod
def set_collection(cls, collection)
Collection setter
SettersInterface.set_database
@classmethod
def set_database(cls, database)
Database setter
SettersInterface.set_collection_name
@classmethod
def set_collection_name(cls, name: str)
Collection name setter