A wrapper to use pymongo-async
via mongoengine
syntax.
Supports transaction inbuilt.
pip install mongoengine-async-extension
- Please note this is a wrapper over mongoengine.
- Most of its operations are migrated to async. The syntax remains similar
from mongoengine import connect
from mongoengine_async_extension import async_mongo_connect
connect(db="mongoengine_async_test") # Synchronous MongoEngine connection
async_mongo_connect(db="mongoengine_async_test") # Asynchronous mongoengine-async connection
from mongoengine import DateField, Document, ObjectIdField
from mongoengine_async_extension import QS
class FooModel(Document):
meta = {
"collection": "foo",
"indexes": [
{"fields": ["tenant"]},
{
"name": "default sort index",
"fields": ["tenant", "start_date"],
},
],
}
start_date = DateField(required=True, description="Start date of fiscal year")
end_date = DateField(required=True, description="End date of fiscal year")
tenant = ObjectIdField(required=True)
FooModel.objects.timeout()
queryset = QS(FooModel)
"""
Class QS
Args:
model (type[T]): Mongoengine Document class
from_son (Callable[[dict], T], optional): from_son callable. Defaults to model._from_son.
auto_deference (bool, optional): auto_deference in _from_son. Defaults to False.
throw_pymongo_errors (bool, optional): if true throws pymongo errors instead of mongoengine errors.
Defaults to False.
session (AsyncClientSession, optional): session to use
read_preference: The read preference to use.
write_concern: An instance of :class:`~pymongo.write_concern.WriteConcern`
read_concern: An instance of :class:`~pymongo.read_concern.ReadConcern`.
"""
from faker import Faker
faker = Faker()
# Insert 100 new FooModel documents
await queryset.insert(
docs=[FooModel(start_date=fake.date(), end_date=fake.date(), tenant=tenant) for _ in range(100)],
load_bulk=True
)
# [<FooModel: FooModel object>, ...]
# List the first 10 documents, including only 'id' and 'start_date', sorted by 'start_date'
results: list[FooModel] = await queryset.limit(10).only("id", "start_date").sort("start_date").to_list()
print([i.to_mongo() for i in results])
# Output example: [SON([('_id', ObjectId('...')), ('start_date', datetime.datetime(...))]), ...]
# Filter by ID and exclude 'start_date'
results: list[FooModel] = await queryset.filter(id__in=[results[0].id]).exclude("start_date").to_list()
print([i.to_mongo() for i in results])
# Output example: [SON([('_id', ObjectId('...')), ('end_date', datetime.datetime(...)), ('tenant', ObjectId('...'))])]
# Get a single document by ID using filter()
one_doc: FooModel = await queryset.filter(id=results[0].id).get()
print(one_doc)
# Output: FooModel object
# Get a single document by ID directly from get()
one_doc: FooModel = await queryset.get(id=results[0].id)
print(one_doc)
# Output: FooModel object
# Count documents matching a specific ID
print(await queryset.filter(id=results[0].id).count())
# Output: 1
# Update all documents in the collection (use with caution!)
print(await queryset.filter().update(start_date=datetime.datetime.now()))
# The output `201` indicates the number of documents modified.
# Delete a specific document by ID
print(await queryset.filter(id=results[0].id).delete())
# Output: DeleteResult(...) (details about the delete operation)
# Perform a simple aggregation to project only the '_id' field using aggregate_cursor
print(await (await queryset.aggregate_cursor(pipeline=[{"$project": {"_id": 1}}])).to_list())
# Output example: [{'_id': ObjectId('...')}, ...]
# Perform the same aggregation using the convenience aggregate() method
print(await queryset.aggregate(pipeline=[{"$project": {"_id": 1}}]))
# Output example: [{'_id': ObjectId('...')}, ...]
model = FooModel(
start_date=fake.date(),
end_date=fake.date(),
tenant=ObjectId(),
)
await queryset.doc(model).save()
print(model.to_mongo())
# Output: SON([...]) (the saved document's data)
doc: FooModel = await queryset.get(id='some_id')
# Modify the 'start_date' of the 'model' instance
# QS updates the original model and also returns it, you may access it in both ways
updated_doc = await queryset.doc(doc).modify(
start_date=datetime.datetime.now().replace(year=2999),
)
print(updated_doc)
# Output: FooModel object (the modified document instance)
print(doc)
# Output: FooModel object (the modified document instance)
# updated_doc == doc
doc: FooModel = await queryset.get(id=ObjectId())
# Delete the 'model' instance from the database
print(await queryset.doc(doc).delete())
# Output: bool (true if success)
from pymongo import AsyncMongoClient
from pymongo.asynchronous.client_session import AsyncClientSession
from mongoengine_async_extension import QS, async_mongo_client
db: AsyncMongoClient = async_mongo_client()
session: AsyncClientSession = db.start_session()
async with await session.start_transaction() as session:
queryset = QS(FooModel, session=session)
doc: FooModel = await queryset.get(id=ObjectId())
await queryset.doc(doc).modify(
start_date=datetime.datetime.now().replace(year=2999),
)
await queryset.doc(doc).delete()