Motor: Asynchronous Python driver for MongoDB

_images/motor.png

About

Motor presents a coroutine-based API for non-blocking access to MongoDB from Tornado or asyncio.

The source is on GitHub and the docs are on ReadTheDocs.

“We use Motor in high throughput environments, processing tens of thousands of requests per second. It allows us to take full advantage of modern hardware, ensuring we utilise the entire capacity of our purchased CPUs. This helps us be more efficient with computing power, compute spend and minimises the environmental impact of our infrastructure as a result.”

David Mytton, Server Density

“We develop easy-to-use sensors and sensor systems with open source software to ensure every innovator, from school child to laboratory researcher, has the same opportunity to create. We integrate Motor into our software to guarantee massively scalable sensor systems for everyone.”

Ryan Smith, inXus Interactive

Install with:

$ python -m pip install motor

Getting Help

If you’re having trouble or have questions about Motor, ask your question on our MongoDB Community Forum. You may also want to consider a commercial support subscription. Once you get an answer, it’d be great if you could work it back into this documentation and contribute!

Issues

All issues should be reported (and can be tracked / voted for / commented on) at the main MongoDB JIRA bug tracker, in the “Motor” project.

Feature Requests / Feedback

Use our feedback engine to send us feature requests and general feedback about PyMongo.

Contributing

Motor has a large community and contributions are always encouraged. Contributions can be as simple as minor tweaks to this documentation. To contribute, fork the project on GitHub and send a pull request.

Changes

See the Changelog for a full list of changes to Motor.

Contents

Differences between Motor and PyMongo

Important

This page describes using Motor with Tornado. Beginning in version 0.5 Motor can also integrate with asyncio instead of Tornado.

Major differences

Connecting to MongoDB

Motor provides a single client class, MotorClient. Unlike PyMongo’s MongoClient, Motor’s client class does not begin connecting in the background when it is instantiated. Instead it connects on demand, when you first attempt an operation.

Coroutines

Motor supports nearly every method PyMongo does, but Motor methods that do network I/O are coroutines. See Tutorial: Using Motor With Tornado.

Threading and forking

Multithreading and forking are not supported; Motor is intended to be used in a single-threaded Tornado application. See Tornado’s documentation on running Tornado in production to take advantage of multiple cores.

Minor differences

GridFS
  • File-like

    PyMongo’s GridIn and GridOut strive to act like Python’s built-in file objects, so they can be passed to many functions that expect files. But the I/O methods of MotorGridIn and MotorGridOut are asynchronous, so they cannot obey the file API and aren’t suitable in the same circumstances as files.

  • Setting properties

    In PyMongo, you can set arbitrary attributes on a GridIn and they’re stored as metadata on the server, even after the GridIn is closed:

    fs = gridfs.GridFSBucket(db)
    grid_in, file_id = fs.open_upload_stream('test_file')
    grid_in.close()
    grid_in.my_field = 'my_value'  # Sends update to server.
    

    Updating metadata on a MotorGridIn is asynchronous, so the API is different:

    async def f():
        fs = motor.motor_tornado.MotorGridFSBucket(db)
        grid_in, file_id = fs.open_upload_stream('test_file')
        await grid_in.close()
    
        # Sends update to server.
        await grid_in.set('my_field', 'my_value')
    
is_locked

In PyMongo is_locked is a property of MongoClient. Since determining whether the server has been fsyncLocked requires I/O, Motor has no such convenience method. The equivalent in Motor is:

result = await client.admin.current_op()
locked = bool(result.get('fsyncLock', None))
system_js

PyMongo supports Javascript procedures stored in MongoDB with syntax like:

>>> db.system_js.my_func = 'function(x) { return x * x; }'
>>> db.system_js.my_func(2)
4.0

Motor does not.

Cursor slicing

In PyMongo, the following raises an IndexError if the collection has fewer than 101 documents:

# Can raise IndexError.
doc = db.collection.find()[100]

In Motor, however, no exception is raised. The query simply has no results:

async def f():
    cursor = db.collection.find()[100]

    # Iterates zero or one times.
    async for doc in cursor:
        print(doc)

The difference arises because the PyMongo Cursor’s slicing operator blocks until it has queried the MongoDB server, and determines if a document exists at the desired offset; Motor simply returns a new MotorCursor with a skip and limit applied.

Creating a collection

There are two ways to create a capped collection using PyMongo:

# Typical:
db.create_collection(
    'collection1',
    capped=True,
    size=1000)

# Unusual:
collection = Collection(
    db,
    'collection2',
    capped=True,
    size=1000)

Motor can’t do I/O in a constructor, so the unusual style is prohibited and only the typical style is allowed:

async def f():
    await db.create_collection(
        'collection1',
        capped=True,
        size=1000)

Motor Features

Non-Blocking

Motor is an asynchronous driver for MongoDB. It can be used from Tornado or asyncio applications. Motor never blocks the event loop while connecting to MongoDB or performing I/O.

Featureful

Motor wraps almost all of PyMongo’s API and makes it non-blocking. For the few PyMongo features not implemented in Motor, see Differences between Motor and PyMongo.

Convenient With tornado.gen

The tornado.gen module lets you use coroutines to simplify asynchronous code. Motor methods return Futures that are convenient to use with coroutines.

Configurable IOLoops

Motor supports Tornado applications with multiple IOLoops. Pass the io_loop argument to MotorClient to configure the loop for a client instance.

Streams Static Files from GridFS

Motor can stream data from GridFS to a Tornado RequestHandler using stream_to_handler() or the GridFSHandler class. It can also serve GridFS data with aiohttp using the AIOHTTPGridFS class.

Installation

Install Motor from PyPI with pip:

$ python3 -m pip install motor

Pip automatically installs Motor’s prerequisite packages. See Requirements.

To install Motor from sources, you can clone its git repository and do:

$ python3 -m pip install .

Requirements

The current version of Motor requires:

  • CPython 3.5 and later.
  • PyMongo 3.11 and later.

Motor can integrate with either Tornado or asyncio.

The default authentication mechanism for MongoDB 3.0+ is SCRAM-SHA-1.

Building the docs requires sphinx.

Compatibility Matrix

Motor and PyMongo
Motor Version PyMongo Version
1.0 3.3+
1.1 3.4+
1.2 3.6+
1.3 3.6+
2.0 3.7+
2.1 3.10+
2.2 3.11+
2.3 3.11+
Motor and MongoDB
MongoDB Version
  2.2 2.4 2.6 3.0 3.2 3.4 3.6 4.0 4.2 4.4
Motor Version 1.0 Y Y Y Y Y N N N N N
  1.1 Y Y Y Y Y Y N N N N
  1.2 N N Y Y Y Y Y N N N
  1.3 N N Y Y Y Y Y N N N
  2.0 N N N Y Y Y Y Y Y N
  2.1 N N N Y Y Y Y Y Y N
  2.2 N N N Y Y Y Y Y Y Y
  2.3 N N N Y Y Y Y Y Y Y

There is no relationship between PyMongo and MongoDB version numbers, although the numbers happen to be close or equal in recent releases of PyMongo and MongoDB. Use the PyMongo compatibility matrix to determine what MongoDB version is supported by PyMongo. Use the compatibility matrix above to determine what MongoDB version Motor supports.

Motor and Tornado

Where “N” appears in this matrix, the versions of Motor and Tornado are known to be incompatible, or have not been tested together.

Tornado Version
  3.x 4.x 5.x 6.x
Motor Version 1.0 Y Y N N
  1.1 Y Y N N
  1.2 N Y N N
  1.3 N Y N N
  2.0 N Y Y N
  2.1 N Y Y Y
  2.2 N N Y Y
  2.3 N N Y Y
Motor and Python

Motor 1.2 dropped support for the short-lived version of the “async for” protocol implemented in Python 3.5.0 and 3.5.1. Motor continues to work with “async for” loops in Python 3.5.2 and later.

Motor 1.2.5 and 1.3.1 add compatibility with Python 3.7, but at the cost of dropping Python 3.4.3 and older. Motor 2.2 dropped support for Pythons older than 3.5.2.

Python Version
  2.5 2.6 2.7 3.3 3.4 3.5.0 3.5.2 3.6 3.7 3.8
Motor Version 1.0 N Y Y Y Y Y Y Y N N
  1.1 N Y Y Y Y Y Y Y N N
  1.2 N N Y N Y N Y Y Y N
  1.3 N N Y N Y N Y Y Y N
  2.0 N N Y N Y N Y Y Y N
  2.1 N N Y N Y N Y Y Y Y
  2.2 N N N N N N Y Y Y Y
  2.3 N N N N N N Y Y Y Y

Not Supported

Motor does not support Jython or IronPython.

Configuration

TLS Protocol Version

Industry best practices, and some regulations, require the use of TLS 1.1 or newer. Though no application changes are required for Motor to make use of the newest protocols, some operating systems or versions may not provide an OpenSSL version new enough to support them.

Users of macOS older than 10.13 (High Sierra) will need to install Python from python.org, homebrew, macports, or another similar source.

Users of Linux or other non-macOS Unix can check their OpenSSL version like this:

$ openssl version

If the version number is less than 1.0.1 support for TLS 1.1 or newer is not available. Contact your operating system vendor for a solution or upgrade to a newer distribution.

You can check your Python interpreter by installing the requests module and executing the following command:

python -c "import requests; print(requests.get('https://www.howsmyssl.com/a/check', verify=False).json()['tls_version'])"

You should see “TLS 1.X” where X is >= 1.

You can read more about TLS versions and their security implications here:

https://www.owasp.org/index.php/Transport_Layer_Protection_Cheat_Sheet#Rule_-_Only_Support_Strong_Protocols

Thread Pool Size

Motor uses the Python standard library’s ThreadPoolExecutor to defer network operations to threads. By default, the executor uses at most five threads per CPU core on your system; to override the default set the environment variable MOTOR_MAX_WORKERS.

Some additional threads are used for monitoring servers and background tasks, so the total count of threads in your process will be greater.

Tutorial: Using Motor With Tornado

A guide to using MongoDB and Tornado with Motor.

Tutorial Prerequisites

You can learn about MongoDB with the MongoDB Tutorial before you learn Motor.

Install pip and then do:

$ pip install tornado motor

Once done, the following should run in the Python shell without raising an exception:

>>> import motor.motor_tornado

This tutorial also assumes that a MongoDB instance is running on the default host and port. Assuming you have downloaded and installed MongoDB, you can start it like so:

$ mongod

Object Hierarchy

Motor, like PyMongo, represents data with a 4-level object hierarchy:

  • MotorClient represents a mongod process, or a cluster of them. You explicitly create one of these client objects, connect it to a running mongod or mongods, and use it for the lifetime of your application.
  • MotorDatabase: Each mongod has a set of databases (distinct sets of data files on disk). You can get a reference to a database from a client.
  • MotorCollection: A database has a set of collections, which contain documents; you get a reference to a collection from a database.
  • MotorCursor: Executing find() on a MotorCollection gets a MotorCursor, which represents the set of documents matching a query.

Creating a Client

You typically create a single instance of MotorClient at the time your application starts up.

>>> client = motor.motor_tornado.MotorClient()

This connects to a mongod listening on the default host and port. You can specify the host and port like:

>>> client = motor.motor_tornado.MotorClient('localhost', 27017)

Motor also supports connection URIs:

>>> client = motor.motor_tornado.MotorClient('mongodb://localhost:27017')

Connect to a replica set like:

>>> client = motor.motor_tornado.MotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')

Getting a Database

A single instance of MongoDB can support multiple independent databases. From an open client, you can get a reference to a particular database with dot-notation or bracket-notation:

>>> db = client.test_database
>>> db = client['test_database']

Creating a reference to a database does no I/O and does not require an await expression.

Tornado Application Startup Sequence

Now that we can create a client and get a database, we’re ready to start a Tornado application that uses Motor:

db = motor.motor_tornado.MotorClient().test_database

application = tornado.web.Application([
    (r'/', MainHandler)
], db=db)

application.listen(8888)
tornado.ioloop.IOLoop.current().start()

There are two things to note in this code. First, the MotorClient constructor doesn’t actually connect to the server; the client will initiate a connection when you attempt the first operation. Second, passing the database as the db keyword argument to Application makes it available to request handlers:

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        db = self.settings['db']

It is a common mistake to create a new client object for every request; this comes at a dire performance cost. Create the client when your application starts and reuse that one client for the lifetime of the process, as shown in these examples.

The Tornado HTTPServer class’s start() method is a simple way to fork multiple web servers and use all of your machine’s CPUs. However, you must create your MotorClient after forking:

# Create the application before creating a MotorClient.
application = tornado.web.Application([
    (r'/', MainHandler)
])

server = tornado.httpserver.HTTPServer(application)
server.bind(8888)

# Forks one process per CPU.
server.start(0)

# Now, in each child process, create a MotorClient.
application.settings['db'] = MotorClient().test_database
IOLoop.current().start()

For production-ready, multiple-CPU deployments of Tornado there are better methods than HTTPServer.start(). See Tornado’s guide to Running and deploying.

Getting a Collection

A collection is a group of documents stored in MongoDB, and can be thought of as roughly the equivalent of a table in a relational database. Getting a collection in Motor works the same as getting a database:

>>> collection = db.test_collection
>>> collection = db['test_collection']

Just like getting a reference to a database, getting a reference to a collection does no I/O and doesn’t require an await expression.

Inserting a Document

As in PyMongo, Motor represents MongoDB documents with Python dictionaries. To store a document in MongoDB, call insert_one() in an await expression:

>>> async def do_insert():
...     document = {'key': 'value'}
...     result = await db.test_collection.insert_one(document)
...     print('result %s' % repr(result.inserted_id))
...
>>>
>>> IOLoop.current().run_sync(do_insert)
result ObjectId('...')

See also

The MongoDB documentation on

insert

A typical beginner’s mistake with Motor is to insert documents in a loop, not waiting for each insert to complete before beginning the next:

>>> for i in range(2000):
...     db.test_collection.insert_one({'i': i})

In PyMongo this would insert each document in turn using a single socket, but Motor attempts to run all the insert_one() operations at once. This requires up to max_pool_size open sockets connected to MongoDB, which taxes the client and server. To ensure instead that all inserts run in sequence, use await:

>>> async def do_insert():
...     for i in range(2000):
...         await db.test_collection.insert_one({'i': i})
...
>>> IOLoop.current().run_sync(do_insert)

See also

The MongoDB documentation on

insert

For better performance, insert documents in large batches with insert_many():

>>> async def do_insert():
...     result = await db.test_collection.insert_many(
...         [{'i': i} for i in range(2000)])
...     print('inserted %d docs' % (len(result.inserted_ids),))
...
>>> IOLoop.current().run_sync(do_insert)
inserted 2000 docs

Getting a Single Document With find_one()

Use find_one() to get the first document that matches a query. For example, to get a document where the value for key “i” is less than 1:

>>> async def do_find_one():
...     document = await db.test_collection.find_one({'i': {'$lt': 1}})
...     pprint.pprint(document)
...
>>> IOLoop.current().run_sync(do_find_one)
{'_id': ObjectId('...'), 'i': 0}

The result is a dictionary matching the one that we inserted previously.

The returned document contains an "_id", which was automatically added on insert.

(We use pprint here instead of print to ensure the document’s key names are sorted the same in your output as ours.)

See also

The MongoDB documentation on

find

Querying for More Than One Document

Use find() to query for a set of documents. find() does no I/O and does not require an await expression. It merely creates an MotorCursor instance. The query is actually executed on the server when you call to_list() or execute an async for loop.

To find all documents with “i” less than 5:

>>> async def do_find():
...     cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i')
...     for document in await cursor.to_list(length=100):
...         pprint.pprint(document)
...
>>> IOLoop.current().run_sync(do_find)
{'_id': ObjectId('...'), 'i': 0}
{'_id': ObjectId('...'), 'i': 1}
{'_id': ObjectId('...'), 'i': 2}
{'_id': ObjectId('...'), 'i': 3}
{'_id': ObjectId('...'), 'i': 4}

A length argument is required when you call to_list to prevent Motor from buffering an unlimited number of documents.

async for

You can handle one document at a time in an async for loop:

>>> async def do_find():
...     c = db.test_collection
...     async for document in c.find({'i': {'$lt': 2}}):
...         pprint.pprint(document)
...
>>> IOLoop.current().run_sync(do_find)
{'_id': ObjectId('...'), 'i': 0}
{'_id': ObjectId('...'), 'i': 1}

You can apply a sort, limit, or skip to a query before you begin iterating:

>>> async def do_find():
...     cursor = db.test_collection.find({'i': {'$lt': 4}})
...     # Modify the query before iterating
...     cursor.sort('i', -1).skip(1).limit(2)
...     async for document in cursor:
...         pprint.pprint(document)
...
>>> IOLoop.current().run_sync(do_find)
{'_id': ObjectId('...'), 'i': 2}
{'_id': ObjectId('...'), 'i': 1}

The cursor does not actually retrieve each document from the server individually; it gets documents efficiently in large batches.

Counting Documents

Use count_documents() to determine the number of documents in a collection, or the number of documents that match a query:

>>> async def do_count():
...     n = await db.test_collection.count_documents({})
...     print('%s documents in collection' % n)
...     n = await db.test_collection.count_documents({'i': {'$gt': 1000}})
...     print('%s documents where i > 1000' % n)
...
>>> IOLoop.current().run_sync(do_count)
2000 documents in collection
999 documents where i > 1000

Updating Documents

replace_one() changes a document. It requires two parameters: a query that specifies which document to replace, and a replacement document. The query follows the same syntax as for find() or find_one(). To replace a document:

>>> async def do_replace():
...     coll = db.test_collection
...     old_document = await coll.find_one({'i': 50})
...     print('found document: %s' % pprint.pformat(old_document))
...     _id = old_document['_id']
...     result = await coll.replace_one({'_id': _id}, {'key': 'value'})
...     print('replaced %s document' % result.modified_count)
...     new_document = await coll.find_one({'_id': _id})
...     print('document is now %s' % pprint.pformat(new_document))
...
>>> IOLoop.current().run_sync(do_replace)
found document: {'_id': ObjectId('...'), 'i': 50}
replaced 1 document
document is now {'_id': ObjectId('...'), 'key': 'value'}

You can see that replace_one() replaced everything in the old document except its _id with the new document.

Use update_one() with MongoDB’s modifier operators to update part of a document and leave the rest intact. We’ll find the document whose “i” is 51 and use the $set operator to set “key” to “value”:

>>> async def do_update():
...     coll = db.test_collection
...     result = await coll.update_one({'i': 51}, {'$set': {'key': 'value'}})
...     print('updated %s document' % result.modified_count)
...     new_document = await coll.find_one({'i': 51})
...     print('document is now %s' % pprint.pformat(new_document))
...
>>> IOLoop.current().run_sync(do_update)
updated 1 document
document is now {'_id': ObjectId('...'), 'i': 51, 'key': 'value'}

“key” is set to “value” and “i” is still 51.

update_one() only affects the first document it finds, you can update all of them with update_many():

await coll.update_many({'i': {'$gt': 100}},
                       {'$set': {'key': 'value'}})

See also

The MongoDB documentation on

update

Removing Documents

delete_many() takes a query with the same syntax as find(). delete_many() immediately removes all matching documents.

>>> async def do_delete_many():
...     coll = db.test_collection
...     n = await coll.count_documents({})
...     print('%s documents before calling delete_many()' % n)
...     result = await db.test_collection.delete_many({'i': {'$gte': 1000}})
...     print('%s documents after' % (await coll.count_documents({})))
...
>>> IOLoop.current().run_sync(do_delete_many)
2000 documents before calling delete_many()
1000 documents after

See also

The MongoDB documentation on

remove

Commands

All operations on MongoDB are implemented internally as commands. Run them using the command() method on MotorDatabase:

.. doctest:: after-inserting-2000-docs
>>> from bson import SON
>>> async def use_distinct_command():
...     response = await db.command(SON([("distinct", "test_collection"),
...                                      ("key", "i")]))
...
>>> IOLoop.current().run_sync(use_distinct_command)

Since the order of command parameters matters, don’t use a Python dict to pass the command’s parameters. Instead, make a habit of using bson.SON, from the bson module included with PyMongo.

Many commands have special helper methods, such as create_collection() or aggregate(), but these are just conveniences atop the basic command() method.

See also

The MongoDB documentation on

commands

Further Reading

The handful of classes and methods introduced here are sufficient for daily tasks. The API documentation for MotorClient, MotorDatabase, MotorCollection, and MotorCursor provides a reference to Motor’s complete feature set.

Learning to use the MongoDB driver is just the beginning, of course. For in-depth instruction in MongoDB itself, see The MongoDB Manual.

Tutorial: Using Motor With asyncio

A guide to using MongoDB and asyncio with Motor.

Tutorial Prerequisites

You can learn about MongoDB with the MongoDB Tutorial before you learn Motor.

Using Python 3.4 or later, do:

$ python3 -m pip install motor

This tutorial assumes that a MongoDB instance is running on the default host and port. Assuming you have downloaded and installed MongoDB, you can start it like so:

$ mongod

Object Hierarchy

Motor, like PyMongo, represents data with a 4-level object hierarchy:

  • AsyncIOMotorClient represents a mongod process, or a cluster of them. You explicitly create one of these client objects, connect it to a running mongod or mongods, and use it for the lifetime of your application.
  • AsyncIOMotorDatabase: Each mongod has a set of databases (distinct sets of data files on disk). You can get a reference to a database from a client.
  • AsyncIOMotorCollection: A database has a set of collections, which contain documents; you get a reference to a collection from a database.
  • AsyncIOMotorCursor: Executing find() on an AsyncIOMotorCollection gets an AsyncIOMotorCursor, which represents the set of documents matching a query.

Creating a Client

You typically create a single instance of AsyncIOMotorClient at the time your application starts up.

>>> import motor.motor_asyncio
>>> client = motor.motor_asyncio.AsyncIOMotorClient()

This connects to a mongod listening on the default host and port. You can specify the host and port like:

>>> client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017)

Motor also supports connection URIs:

>>> client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')

Connect to a replica set like:

>>> client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')

Getting a Database

A single instance of MongoDB can support multiple independent databases. From an open client, you can get a reference to a particular database with dot-notation or bracket-notation:

>>> db = client.test_database
>>> db = client['test_database']

Creating a reference to a database does no I/O and does not require an await expression.

Getting a Collection

A collection is a group of documents stored in MongoDB, and can be thought of as roughly the equivalent of a table in a relational database. Getting a collection in Motor works the same as getting a database:

>>> collection = db.test_collection
>>> collection = db['test_collection']

Just like getting a reference to a database, getting a reference to a collection does no I/O and doesn’t require an await expression.

Inserting a Document

As in PyMongo, Motor represents MongoDB documents with Python dictionaries. To store a document in MongoDB, call insert_one() in an await expression:

>>> async def do_insert():
...     document = {'key': 'value'}
...     result = await db.test_collection.insert_one(document)
...     print('result %s' % repr(result.inserted_id))
...
>>>
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_insert())
result ObjectId('...')

See also

The MongoDB documentation on

insert

Insert documents in large batches with insert_many():

>>> async def do_insert():
...     result = await db.test_collection.insert_many(
...         [{'i': i} for i in range(2000)])
...     print('inserted %d docs' % (len(result.inserted_ids),))
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_insert())
inserted 2000 docs

Getting a Single Document With find_one

Use find_one() to get the first document that matches a query. For example, to get a document where the value for key “i” is less than 1:

>>> async def do_find_one():
...     document = await db.test_collection.find_one({'i': {'$lt': 1}})
...     pprint.pprint(document)
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_find_one())
{'_id': ObjectId('...'), 'i': 0}

The result is a dictionary matching the one that we inserted previously.

Note

The returned document contains an "_id", which was automatically added on insert.

See also

The MongoDB documentation on

find

Querying for More Than One Document

Use find() to query for a set of documents. find() does no I/O and does not require an await expression. It merely creates an AsyncIOMotorCursor instance. The query is actually executed on the server when you call to_list() or execute an async for loop.

To find all documents with “i” less than 5:

>>> async def do_find():
...     cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i')
...     for document in await cursor.to_list(length=100):
...         pprint.pprint(document)
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_find())
{'_id': ObjectId('...'), 'i': 0}
{'_id': ObjectId('...'), 'i': 1}
{'_id': ObjectId('...'), 'i': 2}
{'_id': ObjectId('...'), 'i': 3}
{'_id': ObjectId('...'), 'i': 4}

A length argument is required when you call to_list to prevent Motor from buffering an unlimited number of documents.

async for

You can handle one document at a time in an async for loop:

>>> async def do_find():
...     c = db.test_collection
...     async for document in c.find({'i': {'$lt': 2}}):
...         pprint.pprint(document)
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_find())
{'_id': ObjectId('...'), 'i': 0}
{'_id': ObjectId('...'), 'i': 1}

You can apply a sort, limit, or skip to a query before you begin iterating:

>>> async def do_find():
...     cursor = db.test_collection.find({'i': {'$lt': 4}})
...     # Modify the query before iterating
...     cursor.sort('i', -1).skip(1).limit(2)
...     async for document in cursor:
...         pprint.pprint(document)
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_find())
{'_id': ObjectId('...'), 'i': 2}
{'_id': ObjectId('...'), 'i': 1}

The cursor does not actually retrieve each document from the server individually; it gets documents efficiently in large batches.

Counting Documents

Use count_documents() to determine the number of documents in a collection, or the number of documents that match a query:

>>> async def do_count():
...     n = await db.test_collection.count_documents({})
...     print('%s documents in collection' % n)
...     n = await db.test_collection.count_documents({'i': {'$gt': 1000}})
...     print('%s documents where i > 1000' % n)
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_count())
2000 documents in collection
999 documents where i > 1000

Updating Documents

replace_one() changes a document. It requires two parameters: a query that specifies which document to replace, and a replacement document. The query follows the same syntax as for find() or find_one(). To replace a document:

>>> async def do_replace():
...     coll = db.test_collection
...     old_document = await coll.find_one({'i': 50})
...     print('found document: %s' % pprint.pformat(old_document))
...     _id = old_document['_id']
...     result = await coll.replace_one({'_id': _id}, {'key': 'value'})
...     print('replaced %s document' % result.modified_count)
...     new_document = await coll.find_one({'_id': _id})
...     print('document is now %s' % pprint.pformat(new_document))
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_replace())
found document: {'_id': ObjectId('...'), 'i': 50}
replaced 1 document
document is now {'_id': ObjectId('...'), 'key': 'value'}

You can see that replace_one() replaced everything in the old document except its _id with the new document.

Use update_one() with MongoDB’s modifier operators to update part of a document and leave the rest intact. We’ll find the document whose “i” is 51 and use the $set operator to set “key” to “value”:

>>> async def do_update():
...     coll = db.test_collection
...     result = await coll.update_one({'i': 51}, {'$set': {'key': 'value'}})
...     print('updated %s document' % result.modified_count)
...     new_document = await coll.find_one({'i': 51})
...     print('document is now %s' % pprint.pformat(new_document))
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_update())
updated 1 document
document is now {'_id': ObjectId('...'), 'i': 51, 'key': 'value'}

“key” is set to “value” and “i” is still 51.

update_one() only affects the first document it finds, you can update all of them with update_many():

await coll.update_many({'i': {'$gt': 100}},
                       {'$set': {'key': 'value'}})

See also

The MongoDB documentation on

update

Deleting Documents

delete_many() takes a query with the same syntax as find(). delete_many() immediately removes all matching documents.

>>> async def do_delete_many():
...     coll = db.test_collection
...     n = await coll.count_documents({})
...     print('%s documents before calling delete_many()' % n)
...     result = await db.test_collection.delete_many({'i': {'$gte': 1000}})
...     print('%s documents after' % (await coll.count_documents({})))
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(do_delete_many())
2000 documents before calling delete_many()
1000 documents after

See also

The MongoDB documentation on

remove

Commands

All operations on MongoDB are implemented internally as commands. Run them using the command() method on AsyncIOMotorDatabase:

.. doctest:: after-inserting-2000-docs
>>> from bson import SON
>>> async def use_distinct_command():
...     response = await db.command(SON([("distinct", "test_collection"),
...                                      ("key", "i")]))
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(use_distinct_command())

Since the order of command parameters matters, don’t use a Python dict to pass the command’s parameters. Instead, make a habit of using bson.SON, from the bson module included with PyMongo.

Many commands have special helper methods, such as create_collection() or aggregate(), but these are just conveniences atop the basic command() method.

See also

The MongoDB documentation on

commands

A Web Application With aiohttp

Let us create a web application using aiohttp, a popular HTTP package for asyncio. Install it with:

python3 -m pip install aiohttp

We are going to make a trivial web site with two pages served from MongoDB. To begin:

import asyncio

from aiohttp import web
from motor.motor_asyncio import AsyncIOMotorClient


async def setup_db():
    db = AsyncIOMotorClient().test
    await db.pages.drop()
    html = '<html><body>{}</body></html>'
    await db.pages.insert_one({'_id': 'page-one',
                                    'body': html.format('Hello!')})

    await db.pages.insert_one({'_id': 'page-two',
                                    'body': html.format('Goodbye.')})

    return db

The AsyncIOMotorClient constructor does not actually connect to MongoDB. The client connects on demand, when you attempt the first operation. We create it and assign the “test” database’s handle to db.

The setup_db coroutine drops the “pages” collection (plainly, this code is for demonstration purposes), then inserts two documents. Each document’s page name is its unique id, and the “body” field is a simple HTML page. Finally, setup_db returns the database handle.

We’ll use the setup_db coroutine soon. First, we need a request handler that serves pages from the data we stored in MongoDB.

async def page_handler(request):
    # If the visitor gets "/pages/page-one", then page_name is "page-one".
    page_name = request.match_info.get('page_name')

    # Retrieve the long-lived database handle.
    db = request.app['db']

    # Find the page by its unique id.
    document = await db.pages.find_one(page_name)

    if not document:
        return web.HTTPNotFound(text='No page named {!r}'.format(page_name))

    return web.Response(body=document['body'].encode(),
                        content_type='text/html')

We start the server by running setup_db and passing the database handle to an aiohttp.web.Application:

loop = asyncio.get_event_loop()
db = loop.run_until_complete(setup_db())
app = web.Application()
app['db'] = db
# Route requests to the page_handler() coroutine.
app.router.add_get('/pages/{page_name}', page_handler)
web.run_app(app)

Note that it is a common mistake to create a new client object for every request; this comes at a dire performance cost. Create the client when your application starts and reuse that one client for the lifetime of the process. You can maintain the client by storing a database handle from the client on your application object, as shown in this example.

Visit localhost:8080/pages/page-one and the server responds “Hello!”. At localhost:8080/pages/page-two it responds “Goodbye.” At other URLs it returns a 404.

The complete code is in the Motor repository in examples/aiohttp_example.py.

See also the AIOHTTPGridFS Example.

Further Reading

The handful of classes and methods introduced here are sufficient for daily tasks. The API documentation for AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection, and AsyncIOMotorCursor provides a reference to Motor’s complete feature set.

Learning to use the MongoDB driver is just the beginning, of course. For in-depth instruction in MongoDB itself, see The MongoDB Manual.

Motor Examples

Bulk Write Operations

This tutorial explains how to take advantage of Motor’s bulk write operation features. Executing write operations in batches reduces the number of network round trips, increasing write throughput.

This example describes using Motor with Tornado. Beginning in version 0.5 Motor can also integrate with asyncio instead of Tornado.

Bulk Insert

A batch of documents can be inserted by passing a list or generator to the insert_many() method. Motor will automatically split the batch into smaller sub-batches based on the maximum message size accepted by MongoDB, supporting very large bulk insert operations.

>>> async def f():
...     await db.test.insert_many(({'i': i} for i in range(10000)))
...     count = await db.test.count_documents({})
...     print("Final count: %d" % count)
>>>
>>> IOLoop.current().run_sync(f)
Final count: 10000
Mixed Bulk Write Operations

Motor also supports executing mixed bulk write operations. A batch of insert, update, and remove operations can be executed together using the bulk write operations API.

Ordered Bulk Write Operations

Ordered bulk write operations are batched and sent to the server in the order provided for serial execution. The return value is an instance of BulkWriteResult describing the type and count of operations performed.

>>> from pprint import pprint
>>> from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
>>> async def f():
...     result = await db.test.bulk_write([
...     DeleteMany({}),  # Remove all documents from the previous example.
...     InsertOne({'_id': 1}),
...     InsertOne({'_id': 2}),
...     InsertOne({'_id': 3}),
...     UpdateOne({'_id': 1}, {'$set': {'foo': 'bar'}}),
...     UpdateOne({'_id': 4}, {'$inc': {'j': 1}}, upsert=True),
...     ReplaceOne({'j': 1}, {'j': 2})])
...     pprint(result.bulk_api_result)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 3,
 'nMatched': 2,
 'nModified': 2,
 'nRemoved': 10000,
 'nUpserted': 1,
 'upserted': [{'_id': 4, 'index': 5}],
 'writeConcernErrors': [],
 'writeErrors': []}

The first write failure that occurs (e.g. duplicate key error) aborts the remaining operations, and Motor raises BulkWriteError. The details attribute of the exception instance provides the execution results up until the failure occurred and details about the failure - including the operation that caused the failure.

>>> from pymongo import InsertOne, DeleteOne, ReplaceOne
>>> from pymongo.errors import BulkWriteError
>>> async def f():
...     requests = [
...         ReplaceOne({'j': 2}, {'i': 5}),
...         InsertOne({'_id': 4}),  # Violates the unique key constraint on _id.
...         DeleteOne({'i': 5})]
...     try:
...         await db.test.bulk_write(requests)
...     except BulkWriteError as bwe:
...         pprint(bwe.details)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 0,
 'nMatched': 1,
 'nModified': 1,
 'nRemoved': 0,
 'nUpserted': 0,
 'upserted': [],
 'writeConcernErrors': [],
 'writeErrors': [{'code': 11000,
                  'errmsg': '... duplicate key error ...',
                  'index': 1,
                  'op': {'_id': 4}}]}
Unordered Bulk Write Operations

Unordered bulk write operations are batched and sent to the server in arbitrary order where they may be executed in parallel. Any errors that occur are reported after all operations are attempted.

In the next example the first and third operations fail due to the unique constraint on _id. Since we are doing unordered execution the second and fourth operations succeed.

>>> async def f():
...     requests = [
...         InsertOne({'_id': 1}),
...         DeleteOne({'_id': 2}),
...         InsertOne({'_id': 3}),
...         ReplaceOne({'_id': 4}, {'i': 1})]
...     try:
...         await db.test.bulk_write(requests, ordered=False)
...     except BulkWriteError as bwe:
...         pprint(bwe.details)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 0,
 'nMatched': 1,
 'nModified': 1,
 'nRemoved': 1,
 'nUpserted': 0,
 'upserted': [],
 'writeConcernErrors': [],
 'writeErrors': [{'code': 11000,
                  'errmsg': '... duplicate key error ...',
                  'index': 0,
                  'op': {'_id': 1}},
                 {'code': 11000,
                  'errmsg': '... duplicate key error ...',
                  'index': 2,
                  'op': {'_id': 3}}]}
Write Concern

Bulk operations are executed with the write_concern of the collection they are executed against. Write concern errors (e.g. wtimeout) will be reported after all operations are attempted, regardless of execution order.

.. Standalone MongoDB raises "can't use w>1" with this example, so skip it.

>>> from pymongo import WriteConcern
>>> async def f():
...     coll = db.get_collection(
...         'test', write_concern=WriteConcern(w=4, wtimeout=1))
...     try:
...         await coll.bulk_write([InsertOne({'a': i}) for i in range(4)])
...     except BulkWriteError as bwe:
...         pprint(bwe.details)
...
>>> IOLoop.current().run_sync(f)
{'nInserted': 4,
 'nMatched': 0,
 'nModified': 0,
 'nRemoved': 0,
 'nUpserted': 0,
 'upserted': [],
 'writeConcernErrors': [{'code': 64,
                         'errInfo': {'wtimeout': True},
                         'errmsg': 'waiting for replication timed out'}],
 'writeErrors': []}

Application Performance Monitoring (APM)

Motor implements the same Command Monitoring and Topology Monitoring specifications as other MongoDB drivers. Therefore, you can register callbacks to be notified of every MongoDB query or command your program sends, and the server’s reply to each, as well as getting a notification whenever the driver checks a server’s status or detects a change in your replica set.

Motor wraps PyMongo, and it shares PyMongo’s API for monitoring. To receive notifications about events, you subclass one of PyMongo’s four listener classes, CommandListener, ServerListener, TopologyListener, or ServerHeartbeatListener.

Command Monitoring

Subclass CommandListener to be notified whenever a command starts, succeeds, or fails.

import logging
import sys

from pymongo import monitoring

logging.basicConfig(stream=sys.stdout, level=logging.INFO)


class CommandLogger(monitoring.CommandListener):
    def started(self, event):
        logging.info("Command {0.command_name} with request id "
                     "{0.request_id} started on server "
                     "{0.connection_id}".format(event))

    def succeeded(self, event):
        logging.info("Command {0.command_name} with request id "
                     "{0.request_id} on server {0.connection_id} "
                     "succeeded in {0.duration_micros} "
                     "microseconds".format(event))

    def failed(self, event):
        logging.info("Command {0.command_name} with request id "
                     "{0.request_id} on server {0.connection_id} "
                     "failed in {0.duration_micros} "
                     "microseconds".format(event))

Register an instance of MyCommandLogger:

monitoring.register(CommandLogger())

You can register any number of listeners, of any of the four listener types.

Although you use only APIs from PyMongo’s monitoring module to configure monitoring, if you create a MotorClient its commands are monitored, the same as a PyMongo MongoClient.

from tornado import gen, ioloop
from motor import MotorClient

client = MotorClient()


async def do_insert():
    await client.test.collection.insert_one({'message': 'hi!'})

    # For this example, wait 10 seconds for more monitoring events to fire.
    await gen.sleep(10)


ioloop.IOLoop.current().run_sync(do_insert)

This logs something like:

Command insert with request id 50073 started on server ('localhost', 27017)
Command insert with request id 50073 on server ('localhost', 27017)
succeeded in 362 microseconds

See PyMongo’s monitoring module for details about the event data your callbacks receive.

Server and Topology Monitoring

Subclass ServerListener to be notified whenever Motor detects a change in the state of a MongoDB server it is connected to.

class ServerLogger(monitoring.ServerListener):
    def opened(self, event):
        logging.info("Server {0.server_address} added to topology "
                     "{0.topology_id}".format(event))

    def description_changed(self, event):
        previous_server_type = event.previous_description.server_type
        new_server_type = event.new_description.server_type
        if new_server_type != previous_server_type:
            logging.info(
                "Server {0.server_address} changed type from "
                "{0.previous_description.server_type_name} to "
                "{0.new_description.server_type_name}".format(event))

    def closed(self, event):
        logging.warning("Server {0.server_address} removed from topology "
                        "{0.topology_id}".format(event))


monitoring.register(ServerLogger())

Subclass TopologyListener to be notified whenever Motor detects a change in the state of your server topology. Examples of such topology changes are a replica set failover, or if you are connected to several mongos servers and one becomes unavailable.

class TopologyLogger(monitoring.TopologyListener):
    def opened(self, event):
        logging.info("Topology with id {0.topology_id} "
                     "opened".format(event))

    def description_changed(self, event):
        logging.info("Topology description updated for "
                     "topology id {0.topology_id}".format(event))
        previous_topology_type = event.previous_description.topology_type
        new_topology_type = event.new_description.topology_type
        if new_topology_type != previous_topology_type:
            logging.info(
                "Topology {0.topology_id} changed type from "
                "{0.previous_description.topology_type_name} to "
                "{0.new_description.topology_type_name}".format(event))

    def closed(self, event):
        logging.info("Topology with id {0.topology_id} "
                     "closed".format(event))


monitoring.register(TopologyLogger())

Motor monitors MongoDB servers with periodic checks called “heartbeats”. Subclass ServerHeartbeatListener to be notified whenever Motor begins a server check, and whenever a check succeeds or fails.

class HeartbeatLogger(monitoring.ServerHeartbeatListener):
    def started(self, event):
        logging.info("Heartbeat sent to server "
                     "{0.connection_id}".format(event))

    def succeeded(self, event):
        logging.info("Heartbeat to server {0.connection_id} "
                     "succeeded with reply "
                     "{0.reply.document}".format(event))

    def failed(self, event):
        logging.warning("Heartbeat to server {0.connection_id} "
                        "failed with error {0.reply}".format(event))


monitoring.register(HeartbeatLogger())
Thread Safety

Watch out: Your listeners’ callbacks are executed on various background threads, not the main thread. To interact with Tornado or Motor from a listener callback, you must defer to the main thread using IOLoop.add_callback, which is the only thread-safe IOLoop method. Similarly, if you use asyncio instead of Tornado, defer your action to the main thread with call_soon_threadsafe(). There is probably no need to be concerned about this detail, however: logging is the only reasonable thing to do from a listener, and the Python logging module is thread-safe.

Further Information

See also:

Motor Tailable Cursor Example

By default, MongoDB will automatically close a cursor when the client has exhausted all results in the cursor. However, for capped collections you may use a tailable cursor that remains open after the client exhausts the results in the initial cursor.

The following is a basic example of using a tailable cursor to tail the oplog of a replica set member:

from asyncio import sleep
from pymongo.cursor import CursorType

async def tail_oplog_example():
    oplog = client.local.oplog.rs
    first = await oplog.find().sort('$natural', pymongo.ASCENDING).limit(-1).next()
    print(first)
    ts = first['ts']

    while True:
        # For a regular capped collection CursorType.TAILABLE_AWAIT is the
        # only option required to create a tailable cursor. When querying the
        # oplog, the oplog_replay option enables an optimization to quickly
        # find the 'ts' value we're looking for. The oplog_replay option
        # can only be used when querying the oplog. Starting in MongoDB 4.4
        # this option is ignored by the server as queries against the oplog
        # are optimized automatically by the MongoDB query engine.
        cursor = oplog.find({'ts': {'$gt': ts}},
                            cursor_type=CursorType.TAILABLE_AWAIT,
                            oplog_replay=True)
        while cursor.alive:
            async for doc in cursor:
                ts = doc['ts']
                print(doc)
            # We end up here if the find() returned no documents or if the
            # tailable cursor timed out (no new documents were added to the
            # collection for more than 1 second).
            await sleep(1)

See also

Tailable cursors

Tornado Change Stream Example

Watch a collection for changes with MotorCollection.watch() and display each change notification on a web page using web sockets.

Instructions

Start a MongoDB server on its default port and run this script. Then visit:

http://localhost:8888

Open a mongo shell in the terminal and perform some operations on the “test” collection in the “test” database:

> use test
switched to db test
> db.test.insertOne({})
> db.test.updateOne({}, {$set: {x: 1}})
> db.test.deleteOne({})

The application receives each change notification and displays it as JSON on the web page:

Changes

{'documentKey': {'_id': ObjectId('5a2a6967ea2dcf7b1c721cfb')},
 'fullDocument': {'_id': ObjectId('5a2a6967ea2dcf7b1c721cfb')},
 'ns': {'coll': 'test', 'db': 'test'},
 'operationType': 'insert'}

{'documentKey': {'_id': ObjectId('5a2a6967ea2dcf7b1c721cfb')},
 'ns': {'coll': 'test', 'db': 'test'},
 'operationType': 'update',
 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 1.0}}}

{'documentKey': {'_id': ObjectId('5a2a6967ea2dcf7b1c721cfb')},
 'ns': {'coll': 'test', 'db': 'test'},
 'operationType': 'delete'}
Display change notifications over a web socket
import logging
import os
import sys
from base64 import urlsafe_b64encode
from pprint import pformat

from motor.motor_tornado import MotorClient
from bson import json_util  # Installed with PyMongo.

import tornado.escape
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="reload on source changes")
define("mongo", default="mongodb://localhost", help="MongoDB URI")
define("ns", default="test.test", help="database and collection name")


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", MainHandler),
            (r"/socket", ChangesHandler)]

        templates = os.path.join(os.path.dirname(__file__),
                                 "tornado_change_stream_templates")

        super().__init__(handlers,
                         template_path=templates,
                         template_whitespace="all",
                         debug=options.debug)


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", changes=ChangesHandler.cache)


class ChangesHandler(tornado.websocket.WebSocketHandler):
    waiters = set()
    cache = []
    cache_size = 5

    def open(self):
        ChangesHandler.waiters.add(self)

    def on_close(self):
        ChangesHandler.waiters.remove(self)

    @classmethod
    def update_cache(cls, change):
        cls.cache.append(change)
        if len(cls.cache) > cls.cache_size:
            cls.cache = cls.cache[-cls.cache_size:]

    @classmethod
    def send_change(cls, change):
        change_json = json_util.dumps(change)
        for waiter in cls.waiters:
            try:
                waiter.write_message(change_json)
            except Exception:
                logging.error("Error sending message", exc_info=True)

    @classmethod
    def on_change(cls, change):
        logging.info("got change of type '%s'", change.get('operationType'))

        # Each change notification has a binary _id. Use it to make an HTML
        # element id, then remove it.
        html_id = urlsafe_b64encode(change['_id']['_data']).decode().rstrip('=')
        change.pop('_id')
        change['html'] = '<div id="change-%s"><pre>%s</pre></div>' % (
            html_id,
            tornado.escape.xhtml_escape(pformat(change)))

        change['html_id'] = html_id
        ChangesHandler.send_change(change)
        ChangesHandler.update_cache(change)


change_stream = None


async def watch(collection):
    global change_stream

    async with collection.watch() as change_stream:
        async for change in change_stream:
            ChangesHandler.on_change(change)


def main():
    tornado.options.parse_command_line()
    if '.' not in options.ns:
        sys.stderr.write('Invalid ns "%s", must contain a "."' % (options.ns,))
        sys.exit(1)

    db_name, collection_name = options.ns.split('.', 1)
    client = MotorClient(options.mongo)
    collection = client[db_name][collection_name]

    app = Application()
    app.listen(options.port)
    loop = tornado.ioloop.IOLoop.current()
    # Start watching collection for changes.
    loop.add_callback(watch, collection)
    try:
        loop.start()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()


if __name__ == "__main__":
    main()

Authentication With Motor

This page describes using Motor with Tornado. Beginning in version 0.5 Motor can also integrate with asyncio instead of Tornado.

To use authentication, you must start mongod with --auth or, for replica sets or sharded clusters, --keyFile. Create an admin user and optionally normal users or read-only users.

To create an authenticated connection use a MongoDB connection URI:

uri = "mongodb://user:pass@localhost:27017/database_name"
client = motor.motor_tornado.MotorClient(uri)

Motor logs in to the server on demand, when you first attempt an operation.

AIOHTTPGridFS Example

Serve pre-compressed static content from GridFS over HTTP. Uses the aiohttp web framework and AIOHTTPGridFS.

Instructions

Start a MongoDB server on its default port and run this script. Then visit:

http://localhost:8080/fs/my_file

Serve compressed static content from GridFS
import asyncio
import gzip
import tempfile

import aiohttp.web

from motor.aiohttp import AIOHTTPGridFS
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket

client = AsyncIOMotorClient()


# Use Motor to put compressed data in GridFS, with filename "my_file".
async def put_gridfile():
    with tempfile.TemporaryFile() as tmp:
        with gzip.GzipFile(mode='wb', fileobj=tmp) as gzfile:
            for _ in range(10):
                gzfile.write(b'Nonesuch nonsense\n')

        gfs = AsyncIOMotorGridFSBucket(client.my_database)
        tmp.seek(0)
        await gfs.upload_from_stream(filename='my_file',
                                     source=tmp,
                                     metadata={'contentType': 'text',
                                               'compressed': True})

asyncio.get_event_loop().run_until_complete(put_gridfile())


# Add "Content-Encoding: gzip" header for compressed data.
def gzip_header(response, gridout):
    if gridout.metadata.get('compressed'):
        response.headers['Content-Encoding'] = 'gzip'

gridfs_handler = AIOHTTPGridFS(client.my_database,
                               set_extra_headers=gzip_header)

app = aiohttp.web.Application()

# The GridFS URL pattern must have a "{filename}" variable.
resource = app.router.add_resource('/fs/{filename}')
resource.add_route('GET', gridfs_handler)
resource.add_route('HEAD', gridfs_handler)

aiohttp.web.run_app(app)

See also A Web Application With aiohttp.

Changelog

Motor 2.3.1

Motor 2.3.1 fixes two bugs related to change streams.

Bug-fixes:

Issues Resolved

See the Motor 2.3.1 release notes in JIRA for the complete list of resolved issues in this release.

Motor 2.3

Motor 2.3 adds support for contextvars.

New features:

  • Added supported for the contextvars module. Specifically, it is now possible to access context variables inside CommandListener callbacks.

Bug-fixes:

Issues Resolved

See the Motor 2.3 release notes in JIRA for the complete list of resolved issues in this release.

Motor 2.2

Motor 2.2 adds support for MongoDB 4.4 features. It depends on PyMongo 3.11 or later. Motor continues to support MongoDB 3.0 and later. Motor 2.2 also drops support for Python 2.7 and Python 3.4.

New features:

Bug-fixes:

Deprecations:

Issues Resolved

See the Motor 2.2 release notes in JIRA for the complete list of resolved issues in this release.

Motor 2.1

Motor 2.1 adds support for MongoDB 4.2 features. It depends on PyMongo 3.10 or later. Motor continues to support MongoDB 3.0 and later. Motor 2.1 also adds support for Python 3.8.

Motor now offers experimental support for Windows when it is using the asyncio event loop. This means it supports Windows exclusively with Python 3, either integrating with asyncio directly or with Tornado 5 or later: starting in version 5, Tornado uses the asyncio event loop on Python 3 by default.

Additional changes:

  • Support for MongoDB 4.2 sharded transactions. Sharded transactions have the same API as replica set transactions.
  • New method with_transaction() to support conveniently running a transaction in a session with automatic retries and at-most-once semantics.
  • Added the max_commit_time_ms parameter to start_transaction().
  • The retryWrites URI option now defaults to True. Supported write operations that fail with a retryable error will automatically be retried one time, with at-most-once semantics.
  • Support for retryable reads and the retryReads URI option which is enabled by default. See the MongoClient documentation for details. Now that supported operations are retried automatically and transparently, users should consider adjusting any custom retry logic to prevent an application from inadvertently retrying for too long.
  • Support zstandard for wire protocol compression.
  • Support for periodically polling DNS SRV records to update the mongos proxy list without having to change client configuration.
  • New method motor.motor_asyncio.AsyncIOMotorDatabase.aggregate() to support running database level aggregations.
  • Change stream enhancements for MongoDB 4.2:
  • New parameters bucket_name, chunk_size_bytes, write_concern, and read_preference for motor.motor_asyncio.AsyncIOMotorGridFSBucket.
Issues Resolved

See the Motor 2.1 release notes in JIRA for the complete list of resolved issues in this release.

Motor 2.0

Motor 2.0 drops support for MongoDB 2.6 and adds supports MongoDB 4.0 features, including multi-document transactions, and change stream notifications on entire databases or entire MongoDB servers. It adds support for Python 3.7. This version of Motor requires PyMongo 3.7 or later.

This is a major release that removes previously deprecated APIs.

To support multi-document transactions, Motor had to make breaking changes to the session API and release a major version bump. Since this is a major release it also deletes many helper methods and APIs that had been deprecated over the time since Motor 1.0, most notably the old CRUD methods insert, update, remove, and save, and the original callback-based API. Read the Motor 2.0 Migration Guide carefully to upgrade your existing Motor application.

Documentation is updated to warn about obsolete TLS versions, see Configuration. Motor is now tested on Travis in addition to MongoDB’s Evergreen system.

Added support for aiohttp 3.0 and later, and dropped older aiohttp versions. The aiohttp integration now requires Python 3.5+.

The MotorDatabase.add_user and MotorDatabase.remove_user methods are deleted. Manage user accounts with four database commands: createUser, usersInfo, updateUser, and dropUser. You can run any database command with the MotorDatabase.command() method.

The deprecated GridFS classes MotorGridFS and AsyncIOMotorGridFS are deleted in favor of MotorGridFSBucket and AsyncIOMotorGridFSBucket, which conform to driver specs for GridFS.

Additional changes:

Motor 1.3.1

Fix a Python 3.7 compatibility bug caused by importing “async”, which is a keyword in Python 3.7. Drop support for Python 3.4.3 and older.

Motor 1.3.0

Deprecate Motor’s old callback-based async API in preparation for removing it in Motor 2.0. Raise DeprecationWarning whenever a callback is passed.

See the Motor 2.0 Migration Guide.

Motor 1.2.5

Fix a Python 3.7 compatibility bug caused by importing “async”, which is a keyword in Python 3.7. Drop support for Python 3.4.3 and older.

Motor 1.2.4

Fix a Python 3.7 compatibility bug in the MotorChangeStream class returned by MotorCollection.watch(). It is now possible to use change streams in async for loops in Python 3.7.

Motor 1.2.3

Compatibility with latest Sphinx and document how to use the latest TLS protocols.

Motor 1.2.2

Motor 1.2.0 requires PyMongo 3.6 or later. The dependency was properly documented, but not enforced in setup.py. PyMongo 3.6 is now an install-time requirement; thanks to Shane Harvey for the fix.

Motor 1.2.1

An asyncio application that created a Change Stream with MotorCollection.watch() and shut down while the Change Stream was open would print several errors. I have rewritten MotorChangeStream.next() and some Motor internals to allow clean shutdown with asyncio.

Motor 1.2

Motor 1.2 drops support for MongoDB 2.4 and adds support for MongoDB 3.6 features. It depends on PyMongo 3.6 or later. Motor continues to support MongoDB 2.6 and later.

Dropped support for Python 2.6 and 3.3. Motor continues to support Python 2.7, and 3.4+.

Dropped support for Tornado 3. A recent version of Tornado 4 is required.

Dropped support for the Python 3.5.0 and Python 3.5.1 “async for” protocol. Motor allows “async for” with cursors in Python 3.5.2 and later.

See the Compatibility Matrix for the relationships among Motor, Python, Tornado, and MongoDB versions.

Added support for aiohttp 2.0 and later, and dropped older aiohttp versions.

Highlights include:

The maximum number of workers in the thread pool can be overridden with an environment variable, see Configuration.

MotorCollection accepts codec_options, read_preference, write_concern, and read_concern arguments. This is rarely needed; you typically create a MotorCollection from a MotorDatabase, not by calling its constructor directly.

Deleted obsolete class motor.Op.

Motor 1.1

Motor depends on PyMongo 3.4 or later. It wraps the latest PyMongo code which support the new server features introduced in MongoDB 3.4. (It is a coincidence that the latest MongoDB and PyMongo versions are the same number.)

Highlights include:

Warning

Starting in PyMongo 3.4, bson.code.Code.scope may return None, as the default scope is None instead of {}.

Note

PyMongo 3.4+ attempts to create sockets non-inheritable when possible (i.e. it sets the close-on-exec flag on socket file descriptors). Support is limited to a subset of POSIX operating systems (not including Windows) and the flag usually cannot be set in a single atomic operation. CPython 3.4+ implements PEP 446, creating all file descriptors non-inheritable by default. Users that require this behavior are encouraged to upgrade to CPython 3.4+.

Motor 1.0

Motor now depends on PyMongo 3.3 and later. The move from PyMongo 2 to 3 brings a large number of API changes, read the the PyMongo 3 changelog carefully.

MotorReplicaSetClient is removed

In Motor 1.0, MotorClient is the only class. Connect to a replica set with a “replicaSet” URI option or parameter:

MotorClient("mongodb://hostname/?replicaSet=my-rs")
MotorClient(host, port, replicaSet="my-rs")
New features

New classes MotorGridFSBucket and AsyncIOMotorGridFSBucket conform to the GridFS API Spec for MongoDB drivers. These classes supersede the old MotorGridFS and AsyncIOMotorGridFS. See GridFS changes below, especially note the breaking change in GridFSHandler.

Serve GridFS files over HTTP using aiohttp and AIOHTTPGridFS.

MotorClient changes

Removed:

  • MotorClient.open(); clients have opened themselves automatically on demand since version 0.2.
  • MotorClient.seeds, use pymongo.uri_parser.parse_uri() on your MongoDB URI.
  • MotorClient.alive

Added:

Unix domain socket paths must be quoted with urllib.parse.quote_plus() (or urllib.quote_plus in Python 2) before they are included in a URI:

path = '/tmp/mongodb-27017.sock'
MotorClient('mongodb://%s' % urllib.parse.quote_plus(path))
MotorCollection changes

Added:

New bypass_document_validation parameter for initialize_ordered_bulk_op() and initialize_unordered_bulk_op().

Changes to find() and find_one()

The following find/find_one options have been renamed:

These renames only affect your code if you passed these as keyword arguments, like find(fields=['fieldname']). If you passed only positional parameters these changes are not significant for your application.

  • spec -> filter
  • fields -> projection
  • partial -> allow_partial_results

The following find/find_one options have been added:

  • cursor_type (see CursorType for values)
  • oplog_replay
  • modifiers

The following find/find_one options have been removed:

  • network_timeout (use max_time_ms() instead)
  • read_preference (use with_options() instead)
  • tag_sets (use one of the read preference classes from read_preferences and with_options() instead)
  • secondary_acceptable_latency_ms (use the localThresholdMS URI option instead)
  • max_scan (use the new modifiers option instead)
  • snapshot (use the new modifiers option instead)
  • tailable (use the new cursor_type option instead)
  • await_data (use the new cursor_type option instead)
  • exhaust (use the new cursor_type option instead)
  • as_class (use with_options() with CodecOptions instead)
  • compile_re (BSON regular expressions are always decoded to Regex)

The following find/find_one options are deprecated:

  • manipulate

The following renames need special handling.

  • timeout -> no_cursor_timeout - By default, MongoDB closes a cursor after 10 minutes of inactivity. In previous Motor versions, you disabled the timeout by passing timeout=False to MotorCollection.find() or MotorGridFS.find(). The timeout parameter has been renamed to no_cursor_timeout, it defaults to False, and you must now pass no_cursor_timeout=True to disable timeouts.
GridFS

The old GridFS classes MotorGridFS and AsyncIOMotorGridFS are deprecated in favor of MotorGridFSBucket and AsyncIOMotorGridFSBucket, which comply with MongoDB’s cross-language driver spec for GridFS.

The old classes are still supported, but will be removed in Motor 2.0.

BREAKING CHANGE: The overridable method get_gridfs_file of GridFSHandler now takes a MotorGridFSBucket, not a MotorGridFS. It also takes an additional request parameter.

MotorGridIn

New method MotorGridIn.abort().

In a Python 3.5 native coroutine, the “async with” statement calls close() automatically:

async def upload():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    async with await fs.new_file() as gridin:
        await gridin.write(b'First part\n')
        await gridin.write(b'Second part')

    # gridin is now closed automatically.
MotorGridOut

MotorGridOut is now an async iterable, so reading a chunk at a time is much simpler with a Python 3 native coroutine:

async def read_file(file_id):
    fs = motor.motor_tornado.MotorGridFS(db)
    gridout = await fs.get(file_id)

    async for chunk in gridout:
        sys.stdout.write(chunk)

    sys.stdout.flush()
Documentation

The Motor asyncio API is now fully documented, side by side with the Motor Tornado API.

New Developer Guide added.

Motor 0.7

For asynchronous I/O Motor now uses a thread pool, which is faster and simpler than the prior implementation with greenlets. It no longer requires the greenlet package, and now requires the futures backport package on Python 2.

This version updates the PyMongo dependency from 2.8.0 to 2.9.x, and wraps PyMongo 2.9’s new APIs.

Most of Motor 1.0’s API is now implemented, and APIs that will be removed in Motor 1.0 are now deprecated and raise warnings.

MotorClient changes

The get_database method is added for getting a MotorDatabase instance with its options configured differently than the MotorClient’s.

New read-only attributes:

MotorReplicaSetClient changes

The get_database() method is added for getting a MotorDatabase instance with its options configured differently than the MotorReplicaSetClient’s.

New read-only attributes:

  • codec_options
  • local_threshold_ms
MotorDatabase changes

The get_collection() method is added for getting a MotorCollection instance with its options configured differently than the MotorDatabase’s.

The connection property is deprecated in favor of a new read-only attribute client.

New read-only attribute:

MotorCollection changes

The with_options() method is added for getting a MotorCollection instance with its options configured differently than this MotorCollection’s.

New read-only attribute:

The following methods wrap PyMongo’s implementation of the standard CRUD API Spec for MongoDB Drivers:

These new methods do not apply SON Manipulators.

GridFS changes

New MotorGridOutCursor methods:

  • add_option()
  • remove_option()
  • clone()

Added MotorGridOut documentation:

Bugfix

MOTOR-124: an import deadlock in Python 2 and Tornado 3 led to an AutoReconnect exception with some replica sets.

Motor 0.6.2

Fix “from motor import *” for Python 3.

Motor 0.6.1

Fix source distribution, which hadn’t included the “frameworks” submodules.

Motor 0.6

This is a bugfix release. Fixing these bugs has introduced tiny API changes that may affect some programs.

motor_asyncio and motor_tornado submodules

These modules have been moved from:

  • motor_asyncio.py
  • motor_tornado.py

To:

  • motor_asyncio/__init__.py
  • motor_tornado/__init__.py

Motor had to make this change in order to omit the motor_asyncio submodule entirely and avoid a spurious SyntaxError being printed when installing in Python 2. The change should be invisible to application code.

Database and collection names with leading underscores

A database or collection whose name starts with an underscore can no longer be accessed as a property:

# Now raises AttributeError.
db = MotorClient()._mydatabase
collection = db._mycollection
subcollection = collection._subcollection

Such databases and collections can still be accessed dict-style:

# Continues to work the same as previous Motor versions.
db = MotorClient()['_mydatabase']
collection = db['_mycollection']

To ensure a “sub-collection” with a name that includes an underscore is accessible, Motor collections now allow dict-style access, the same as Motor clients and databases always have:

# New in Motor 0.6
subcollection = collection['_subcollection']

These changes solve problems with iPython code completion and the Python 3 ABC abstract base class.

Motor 0.5

asyncio

Motor can now integrate with asyncio, as an alternative to Tornado. My gratitude to Rémi Jolin, Andrew Svetlov, and Nikolay Novik for their huge contributions to Motor’s asyncio integration.

Python 3.5

Motor is now compatible with Python 3.5, which required some effort. Motor not only supports users’ coroutines, it uses coroutines to implement some of its own features, like open() and put(). There is no single way to return a value from a Python 3.5 native coroutine or a Python 2 generator-based coroutine, so Motor internal coroutines that return values were rewritten. (See commit message dc19418c for an explanation.)

async and await

Motor now supports Python 3.5 native coroutines, written with the async and await syntax:

async def f():
    await collection.insert({'_id': 1})

Cursors from find(), aggregate(), or find() can be iterated elegantly and very efficiently in native coroutines with async for:

async def f():
    async for doc in collection.find():
        do_something_with(doc)
aggregate()

MotorCollection.aggregate() now returns a cursor by default, and the cursor is returned immediately without a yield. The old syntax is no longer supported:

# Motor 0.4 and older, no longer supported.
cursor = yield collection.aggregate(pipeline, cursor={})
while (yield cursor.fetch_next):
    doc = cursor.next_object()
    print(doc)

In Motor 0.5, simply do:

# Motor 0.5: no "cursor={}", no "yield".
cursor = collection.aggregate(pipeline)
while (yield cursor.fetch_next):
    doc = cursor.next_object()
    print(doc)

Or with Python 3.5 and later:

# Motor 0.5, Python 3.5.
async for doc in collection.aggregate(pipeline):
    print(doc)

MongoDB versions 2.4 and older do not support aggregation cursors. For compatibility with older MongoDBs, aggregate() now takes an argument cursor=False, and returns a Future that you can yield to get all the results in one document:

# Motor 0.5 with MongoDB 2.4 and older.
reply = yield collection.aggregate(cursor=False)
for doc in reply['results']:
    print(doc)
Deprecations

Motor 0.5 deprecates a large number of APIs that will be removed in version 1.0:

MotorClient:
  • ~MotorClient.host
  • ~MotorClient.port
  • ~MotorClient.document_class
  • ~MotorClient.tz_aware
  • ~MotorClient.secondary_acceptable_latency_ms
  • ~MotorClient.tag_sets
  • ~MotorClient.uuid_subtype
  • ~MotorClient.disconnect
  • ~MotorClient.alive
MotorReplicaSetClient:
  • ~MotorReplicaSetClient.document_class
  • ~MotorReplicaSetClient.tz_aware
  • ~MotorReplicaSetClient.secondary_acceptable_latency_ms
  • ~MotorReplicaSetClient.tag_sets
  • ~MotorReplicaSetClient.uuid_subtype
  • ~MotorReplicaSetClient.alive
MotorDatabase:
  • ~MotorDatabase.secondary_acceptable_latency_ms
  • ~MotorDatabase.tag_sets
  • ~MotorDatabase.uuid_subtype
MotorCollection:
  • ~MotorCollection.secondary_acceptable_latency_ms
  • ~MotorCollection.tag_sets
  • ~MotorCollection.uuid_subtype
Cursor slicing

Cursors can no longer be indexed like cursor[n] or sliced like cursor[start:end], see MOTOR-84. If you wrote code like this:

cursor = collection.find()[i]
yield cursor.fetch_next
doc = cursor.next_object()

Then instead, write:

cursor = collection.find().skip(i).limit(-1)
yield cursor.fetch_next
doc = cursor.next_object()

The negative limit ensures the server closes the cursor after one result, saving Motor the work of closing it. See cursor.limit.

SSL hostname validation error

When you use Motor with Tornado and SSL hostname validation fails, Motor used to raise a ConnectionFailure with a useful messsage like “hostname ‘X’ doesn’t match ‘Y’”. The message is now empty and Tornado logs a warning instead.

Configuring uuid_subtype

You can now get and set uuid_subtype on MotorClient, MotorReplicaSetClient, and MotorDatabase instances, not just on MotorCollection.

Motor 0.4.1

Fix MOTOR-66, deadlock when initiating MotorReplicaSetClient connection from multiple operations at once.

Motor 0.4

Supports MongoDB 3.0. In particular, supports MongoDB 3.0’s new SCRAM-SHA-1 authentication mechanism and updates the implementations of MotorClient.database_names() and MotorDatabase.collection_names().

Updates PyMongo dependency from 2.7.1 to 2.8, therefore inheriting PyMongo 2.7.2’s bug fixes and PyMongo 2.8’s bug fixes and features.

Fixes a connection-pool timeout when waitQueueMultipleMS is set and two bugs in replica set monitoring.

The copy_database method has been removed. It was overly complex and no one used it, see MOTOR-56. You can still use the MotorDatabase.command() method directly. The only scenario not supported is copying a database from one host to another, if the remote host requires authentication. For this, use PyMongo’s copy_database method, or, since PyMongo’s copy_database will be removed in a future release too, use the mongo shell.

Motor 0.3.3

Fix MOTOR-45, a stack-context leak in domain name resolution that could lead to an infinite loop and rapid memory leak.

Document Motor’s Requirements in detail.

Motor 0.3.2

Fix MOTOR-44, a socket leak in MotorClient.copy_database and MotorReplicaSetClient.copy_database.

Motor 0.3.1

Fix MOTOR-43, a TypeError when using GridFSHandler with a timezone-aware MotorClient.

Fix GridFS examples that hadn’t been updated for Motor 0.2’s new syntax.

Fix a unittest that hadn’t been running.

Motor 0.3

No new features.

  • Updates PyMongo dependency from 2.7 to 2.7.1, therefore inheriting PyMongo 2.7.1’s bug fixes.
  • Motor continues to support Python 2.6, 2.7, 3.3, and 3.4, but now with single-source. 2to3 no longer runs during installation with Python 3.
  • nosetests is no longer required for regular Motor tests.
  • Fixes a mistake in the docstring for aggregate().

Motor 0.2.1

Fixes two bugs:

  • MOTOR-32: The documentation for MotorCursor.close() claimed it immediately halted execution of MotorCursor.each(), but it didn’t.
  • MOTOR-33: An incompletely iterated cursor’s __del__ method sometimes got stuck and cost 100% CPU forever, even though the application was still responsive.

Motor 0.2

This version includes API changes that break backward compatibility with applications written for Motor 0.1. For most applications, the migration chores will be minor. In exchange, Motor 0.2 offers a cleaner style, and it wraps the new and improved PyMongo 2.7 instead of 2.5.

Changes in Dependencies

Motor now requires PyMongo 2.7.0 exactly and Tornado 3 or later. It drops support for Python 2.5 since Tornado 3 has dropped it.

Motor continues to work with Python 2.6 through 3.4. It still requires Greenlet.

API Changes
open_sync

The open_sync method has been removed from MotorClient and MotorReplicaSetClient. Clients now connect to MongoDB automatically on first use. Simply delete the call to open_sync from your application.

If it’s important to test that MongoDB is available before continuing your application’s startup, use IOLoop.run_sync:

loop = tornado.ioloop.IOLoop.current()
client = motor.MotorClient(host, port)
try:
    loop.run_sync(client.open)
except pymongo.errors.ConnectionFailure:
    print "Can't connect"

Similarly, calling MotorGridOut.open() is now optional. MotorGridIn and MotorGridFS no longer have an open method at all.

Futures

Motor 0.2 takes advantage of Tornado’s tidy new coroutine syntax:

# Old style:
document = yield motor.Op(collection.find_one, {'_id': my_id})

# New style:
document = yield collection.find_one({'_id': my_id})

To make this possible, Motor asynchronous methods (except MotorCursor.each()) now return a Future.

Using Motor with callbacks is still possible: If a callback is passed, it will be executed with the (result, error) of the operation, same as in Motor 0.1:

def callback(document, error):
    if error:
        logging.error("Oh no!")
    else:
        print document

collection.find_one({'_id': my_id}, callback=callback)

If no callback is passed, a Future is returned that resolves to the method’s result or error:

document = yield collection.find_one({'_id': my_id})

motor.Op works the same as before, but it’s deprecated.

WaitOp and WaitAllOps have been removed. Code that used them can now yield a Future or a list of them. Consider this function written for Tornado 2 and Motor 0.1:

@gen.engine
def get_some_documents():
    cursor = collection.find().sort('_id').limit(2)
    cursor.to_list(callback=(yield gen.Callback('key')))
    do_something_while_we_wait()
    try:
        documents = yield motor.WaitOp('key')
        print documents
    except Exception, e:
        print e

The function now becomes:

@gen.coroutine
def f():
    cursor = collection.find().sort('_id').limit(2)
    future = cursor.to_list(2)
    do_something_while_we_wait()
    try:
        documents = yield future
        print documents
    except Exception, e:
        print e

Similarly, a function written like so in the old style:

@gen.engine
def get_two_documents_in_parallel(collection):
    collection.find_one(
        {'_id': 1}, callback=(yield gen.Callback('one')))

    collection.find_one(
        {'_id': 2}, callback=(yield gen.Callback('two')))

    try:
        doc_one, doc_two = yield motor.WaitAllOps(['one', 'two'])
        print doc_one, doc_two
    except Exception, e:
        print e

Now becomes:

@gen.coroutine
def get_two_documents_in_parallel(collection):
    future_0 = collection.find_one({'_id': 1})
    future_1 = collection.find_one({'_id': 2})

    try:
        doc_one, doc_two = yield [future_0, future_1]
        print doc_one, doc_two
    except Exception, e:
        print e
to_list

Any calls to MotorCursor.to_list() that omitted the length argument must now include it:

result = yield collection.find().to_list(100)

None is acceptable, meaning “unlimited.” Use with caution.

Connection Pooling

MotorPool has been rewritten. It supports the new options introduced in PyMongo 2.6, and drops all Motor-specific options.

MotorClient and MotorReplicaSetClient have an option max_pool_size. It used to mean “minimum idle sockets to keep open”, but its meaning has changed to “maximum sockets open per host.” Once this limit is reached, operations will pause waiting for a socket to become available. Therefore the default has been raised from 10 to 100. If you pass a value for max_pool_size make sure it’s large enough for the expected load. (Sockets are only opened when needed, so there’s no cost to having a max_pool_size larger than necessary. Err towards a larger value.) If you’ve been accepting the default, continue to do so.

max_pool_size is now synonymous with Motor’s special max_concurrent option, so max_concurrent has been removed.

max_wait_time has been renamed waitQueueTimeoutMS for consistency with PyMongo. If you pass max_wait_time, rename it and multiply by 1000.

The MotorPoolTimeout exception is gone; catch PyMongo’s ConnectionFailure instead.

DNS

Motor can take advantage of Tornado 3’s asynchronous resolver interface. By default, Motor still uses blocking DNS, but you can enable non-blocking lookup with a threaded resolver:

Resolver.configure('tornado.netutil.ThreadedResolver')

Or install pycares and use the c-ares resolver:

Resolver.configure('tornado.platform.caresresolver.CaresResolver')
MotorCursor.tail

The MotorCursor.tail method has been removed. It was complex, diverged from PyMongo’s feature set, and encouraged overuse of MongoDB capped collections as message queues when a purpose-built message queue is more appropriate. An example of tailing a capped collection is provided instead: Motor Tailable Cursor Example.

MotorClient.is_locked

is_locked has been removed since calling it from Motor would be bizarre. If you called MotorClient.is_locked like:

locked = yield motor.Op(client.is_locked)

you should now do:

result = yield client.admin.current_op()
locked = bool(result.get('fsyncLock', None))

The result is True only if an administrator has called fsyncLock on the mongod. It is unlikely that you have any use for this.

GridFSHandler

get_gridfs_file() now returns a Future instead of accepting a callback.

New Features

The introduction of a Futures-based API is the most pervasive new feature. In addition Motor 0.2 includes new features from PyMongo 2.6 and 2.7:

Bugfixes

MotorReplicaSetClient.open threw an error if called without a callback.

MotorCursor.to_list ignored SON manipulators. (Thanks to Eren Güven for the report and the fix.)

The full list is in Jira.

Motor 0.1.2

Fixes innocuous unittest failures when running against Tornado 3.1.1.

Motor 0.1.1

Fixes issue MOTOR-12 by pinning its PyMongo dependency to PyMongo version 2.5.0 exactly.

Motor relies on some of PyMongo’s internal details, so changes to PyMongo can break Motor, and a change in PyMongo 2.5.1 did. Eventually PyMongo will expose stable hooks for Motor to use, but for now I changed Motor’s dependency from PyMongo>=2.4.2 to PyMongo==2.5.0.

Motor 2.0 Migration Guide

Motor 2.0 brings a number of changes to Motor 1.0’s API. The major version is required in order to update the session API to support multi-document transactions, introduced in MongoDB 4.0; this feature is so valuable that it motivated me to make the breaking change and bump the version number to 2.0. Since this is the first major version number in almost two years, it removes a large number of APIs that have been deprecated in the time since Motor 1.0.

Follow this guide to migrate an existing application that had used Motor 1.x.

Check compatibility

Read the Requirements page and ensure your MongoDB server and Python interpreter are compatible, and your Tornado version if you are using Tornado. If you use aiohttp, upgrade to at least 3.0.

Upgrade to Motor 1.3

The first step in migrating to Motor 2.0 is to upgrade to at least Motor 1.3. If your project has a requirements.txt file, add the line:

motor >= 1.3, < 2.0

Enable Deprecation Warnings

Starting with Motor 1.3, DeprecationWarning is raised by most methods removed in Motor 2.0. Make sure you enable runtime warnings to see where deprecated functions and methods are being used in your application:

python -Wd <your application>

Warnings can also be changed to errors:

python -Wd -Werror <your application>

Migrate from deprecated APIs

The following features are deprecated by PyMongo and scheduled for removal; they are now deleted from Motor:

Migrate from the original callback API

Motor was first released before Tornado had introduced Futures, generator-based coroutines, and the yield syntax, and long before the async features developed during Python 3’s career. Therefore Motor’s original asynchronous API used callbacks:

def callback(result, error):
    if error:
        print(error)
    else:
        print(result)

collection.find_one({}, callback=callback)

Callbacks have been largely superseded by a Futures API intended for use with coroutines, see Tutorial: Using Motor With Tornado. You can still use callbacks with Motor when appropriate but you must add the callback to a Future instead of passing it as a parameter:

def callback(future):
    try:
        result = future.result()
        print(result)
    except Exception as exc:
        print(exc)

future = collection.find_one({})
future.add_done_callback(callback)

The add_done_callback() call can be placed on the same line:

collection.find_one({}).add_done_callback(callback)

In almost all cases the modern coroutine API is more readable and provides better exception handling:

async def do_find():
    try:
        result = await collection.find_one({})
        print(result)
    except Exception as exc:
        print(exc)

Upgrade to Motor 2.0

Once your application runs without deprecation warnings with Motor 1.3, upgrade to Motor 2.0. Update any calls in your code to MotorClient.start_session() or end_session() to handle the following change.

MotorClient.start_session() is a coroutine

In the past, you could use a client session like:

session = client.start_session()
doc = await client.db.collection.find_one({}, session=session)
session.end_session()

Or:

with client.start_session() as session:
   doc = client.db.collection.find_one({}, session=session)

To support multi-document transactions, in Motor 2.0 MotorClient.start_session() is a coroutine, not a regular method. It must be used like await client.start_session() or async with await client.start_session(). The coroutine now returns a new class MotorClientSession, not PyMongo’s ClientSession. The end_session method on the returned MotorClientSession is also now a coroutine instead of a regular method. Use it like:

session = await client.start_session()
doc = await client.db.collection.find_one({}, session=session)
await session.end_session()

Or:

async with client.start_session() as session:
   doc = await client.db.collection.find_one({}, session=session)

Developer Guide

Some explanations for those who would like to contribute to Motor development.

Compatibility

Motor supports the asyncio module in the standard library of Python 3.5.3 and later. Motor also works with Tornado 5.0 and later along with all the Python versions it supports.

Each new Motor feature release depends on the latest PyMongo minor version release or newer, up to the next PyMongo major version release. For example, if 3.10 is the latest available PyMongo version when Motor 2.1 is being released, Motor 2.1 will require 3.10<=PyMongo<4.

Frameworks

Motor abstracts the differences between Tornado and asyncio by wrapping each in a “framework” interface. A Motor framework is a module implementing these properties and functions:

  • CLASS_PREFIX
  • add_future
  • call_soon
  • chain_future
  • chain_return_value
  • check_event_loop
  • coroutine (DEPRECATED)
  • get_event_loop
  • get_future
  • is_event_loop
  • is_future
  • platform_info
  • pymongo_class_wrapper
  • run_on_executor
  • yieldable (DEPRECATED)

See the frameworks/tornado and frameworks/asyncio modules.

Note

Starting in Motor 2.2, the functions marked DEPRECATED in the list above are not used internally in Motor. Instead of being removed from the codebase, they have been left in a deprecated state to avoid breaking any libraries built on top of Motor. These deprecated functions will be removed in Motor 3.0.

A framework-specific class, like MotorClient for Tornado or AsyncIOMotorClient for asyncio, is created by the create_class_with_framework function, which combines a framework with a framework-agnostic class, in this case AgnosticClient.

Wrapping PyMongo

For each PyMongo class, Motor declares an equivalent framework-agnostic class. For example, the AgnosticClient class is a framework-agnostic equivalent to PyMongo’s MongoClient. This agnostic class declares each method and property of the PyMongo class that it intends to wrap. These methods and properties begin life as type MotorAttributeFactory.

When create_class_with_framework creates a framework-specific class from an agnostic class, it creates methods and properties for that class which wrap the equivalent PyMongo methods and properties.

For example, the AgnosticClient class declares that drop_database is an AsyncCommand, which is a subclass of MotorAttributeFactory. At import time, create_class_with_framework calls the create_attribute method of each MotorAttributeFactory on the AgnosticClient, which results in framework-specific implementations of each method and property. So at import time, create_class_with_framework generates framework-specific wrappers of drop_database for MotorClient and AsyncIOMotorClient. These wrappers use framework-specific features to run the drop_database method asynchronously.

Asynchronization

This is the heart of Motor’s implementation. The create_attribute method for asynchronous methods like drop_database wraps the equivalent PyMongo method in a Motor method. This wrapper method uses either the Tornado or asyncio framework to:

  • get a reference to the framework’s event loop
  • start the PyMongo method on a thread in the global ThreadPoolExecutor
  • create a Future that will be resolved by the event loop when the thread finishes
  • returns the Future to the caller

This is what allows Tornado or asyncio awaitables to call Motor methods with await to await I/O without blocking the event loop.

Synchro

A common kind of bug in Motor arises when PyMongo adds a feature, like a new method or new optional behavior, which we forget to wrap with Motor.

Since PyMongo adds a test to its suite for each new feature, we could catch these omissions by applying PyMongo’s latest tests to Motor. Then a missing method or feature would cause an obvious test failure. But PyMongo is synchronous and Motor is async; how can Motor pass PyMongo’s tests?

Synchro is a hacky little module that re-synchronizes all Motor methods using the Tornado IOLoop’s run_sync method. synchrotest.py overrides the Python interpreter’s import machinery to allow Synchro to masquerade as PyMongo, and runs PyMongo’s test suite against it. Use tox -e synchro37 to check out PyMongo’s test suite and run it with Synchro.

Contributors

The following is a list of people who have contributed to Motor. If you belong here and are missing please let us know (or send a pull request after adding yourself to the list):

  • A. Jesse Jiryu Davis
  • Eren Güven
  • Jorge Puente Sarrín
  • Rémi Jolin
  • Andrew Svetlov
  • Nikolay Novik
  • Prashant Mital
  • Shane Harvey
  • Bulat Khasanov

Classes

Motor Tornado API

MotorClient – Connection to MongoDB

class motor.motor_tornado.MotorClient(*args, **kwargs)

Create a new connection to a single MongoDB instance at host:port.

Takes the same constructor arguments as MongoClient, as well as:

Parameters:
  • io_loop (optional): Special event loop instance to use instead of default
client[db_name] || client.db_name

Get the db_name MotorDatabase on MotorClient client.

Raises InvalidName if an invalid database name is used.

coroutine drop_database(name_or_database, session=None)

Drop a database.

Raises TypeError if name_or_database is not an instance of basestring (str in python 3) or Database.

Parameters:
  • name_or_database: the name of a database to drop, or a Database instance representing the database to drop
  • session (optional): a ClientSession.

Note

The write_concern of this client is automatically applied to this operation when using MongoDB >= 3.4.

coroutine fsync(**kwargs)

DEPRECATED: Flush all pending writes to datafiles.

Optional parameters can be passed as keyword arguments:
  • lock: If True lock the server to disallow writes.
  • async: If True don’t block while synchronizing.
  • session (optional): a ClientSession, created with start_session().

Note

Starting with Python 3.7 async is a reserved keyword. The async option to the fsync command can be passed using a dictionary instead:

options = {'async': True}
await client.fsync(**options)

Deprecated. Run the fsync command directly with command() instead. For example:

await client.admin.command('fsync', lock=True)

Changed in version 2.2: Deprecated.

Changed in version 1.2: Added session parameter.

Warning

async and lock can not be used together.

Warning

MongoDB does not support the async option on Windows and will raise an exception on that platform.

get_database(name=None, codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a MotorDatabase with the given name and options.

Useful for creating a MotorDatabase with different codec options, read preference, and/or write concern from this MotorClient.

>>> from pymongo import ReadPreference
>>> client.read_preference == ReadPreference.PRIMARY
True
>>> db1 = client.test
>>> db1.read_preference == ReadPreference.PRIMARY
True
>>> db2 = client.get_database(
...     'test', read_preference=ReadPreference.SECONDARY)
>>> db2.read_preference == ReadPreference.SECONDARY
True
Parameters:
get_default_database(default=None, codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get the database named in the MongoDB connection URI.

>>> uri = 'mongodb://host/my_database'
>>> client = MotorClient(uri)
>>> db = client.get_default_database()
>>> assert db.name == 'my_database'
>>> db = client.get_default_database('fallback_db_name')
>>> assert db.name == 'my_database'
>>> uri_without_database = 'mongodb://host/'
>>> client = MotorClient(uri_without_database)
>>> db = client.get_default_database('fallback_db_name')
>>> assert db.name == 'fallback_db_name'

Useful in scripts where you want to choose which database to use based only on the URI in a configuration file.

Parameters:

New in version 2.1: Revived this method. Added the default, codec_options, read_preference, write_concern and read_concern parameters.

Changed in version 2.0: Removed this method.

coroutine list_database_names(session=None)

Get a list of the names of all databases on the connected server.

Parameters:
coroutine list_databases(session=None, **kwargs)

Get a cursor over the databases of the connected server.

Parameters:
  • session (optional): a ClientSession.
  • **kwargs (optional): Optional parameters of the listDatabases command can be passed as keyword arguments to this method. The supported options differ by server version.
Returns:

An instance of CommandCursor.

coroutine server_info(session=None)

Get information about the MongoDB server we’re connected to.

Parameters:
coroutine start_session(causal_consistency=True, default_transaction_options=None)

Start a logical session.

This method takes the same parameters as PyMongo’s SessionOptions. See the client_session module for details.

This session is created uninitialized, use it in an await expression to initialize it, or an async with statement.

async def coro():
    collection = client.db.collection

    # End the session after using it.
    s = await client.start_session()
    await s.end_session()

    # Or, use an "async with" statement to end the session
    # automatically.
    async with await client.start_session() as s:
        doc = {'_id': ObjectId(), 'x': 1}
        await collection.insert_one(doc, session=s)

        secondary = collection.with_options(
            read_preference=ReadPreference.SECONDARY)

        # Sessions are causally consistent by default, so we can read
        # the doc we just inserted, even reading from a secondary.
        async for doc in secondary.find(session=s):
            print(doc)
            
    # Run a multi-document transaction:
    async with await client.start_session() as s:
        # Note, start_transaction doesn't require "await".
        async with s.start_transaction():
            await collection.delete_one({'x': 1}, session=s)
            await collection.insert_one({'x': 2}, session=s)
        
        # Exiting the "with s.start_transaction()" block while throwing an
        # exception automatically aborts the transaction, exiting the block
        # normally automatically commits it.

        # You can run additional transactions in the same session, so long as 
        # you run them one at a time.
        async with s.start_transaction():
            await collection.insert_one({'x': 3}, session=s)
            await collection.insert_many({'x': {'$gte': 2}},
                                         {'$inc': {'x': 1}}, 
                                         session=s)

Requires MongoDB 3.6. Do not use the same session for multiple operations concurrently. A MotorClientSession may only be used with the MotorClient that started it.

Returns:An instance of MotorClientSession.

Changed in version 2.0: Returns a MotorClientSession. Before, this method returned a PyMongo ClientSession.

New in version 1.2.

coroutine unlock(session=None)

DEPRECATED: Unlock a previously locked server.

Parameters:

Deprecated. Users of MongoDB version 3.2 or newer can run the fsyncUnlock command directly with command():

await motor_client.admin.command('fsyncUnlock')

Users of MongoDB version 3.0 can query the “unlock” virtual collection:

await motor_client.admin["$cmd.sys.unlock"].find_one()

Changed in version 2.2: Deprecated.

watch(pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None)

Watch changes on this cluster.

Returns a MotorChangeStream cursor which iterates over changes on all databases in this cluster. Introduced in MongoDB 4.0.

See the documentation for MotorCollection.watch() for more details and examples.

Parameters:
  • pipeline (optional): A list of aggregation pipeline stages to append to an initial $changeStream stage. Not all pipeline stages are valid after a $changeStream stage, see the MongoDB documentation on change streams for the supported stages.
  • full_document (optional): The fullDocument option to pass to the $changeStream stage. Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  • resume_after (optional): A resume token. If provided, the change stream will start returning changes that occur directly after the operation specified in the resume token. A resume token is the _id value of a change document.
  • max_await_time_ms (optional): The maximum time in milliseconds for the server to wait for changes before responding to a getMore operation.
  • batch_size (optional): The maximum number of documents to return per batch.
  • collation (optional): The Collation to use for the aggregation.
  • start_at_operation_time (optional): If provided, the resulting change stream will only return changes that occurred at or after the specified Timestamp. Requires MongoDB >= 4.0.
  • session (optional): a ClientSession.
  • start_after (optional): The same as resume_after except that start_after can resume notifications after an invalidate event. This option and resume_after are mutually exclusive.
Returns:

A MotorChangeStream.

Changed in version 2.1: Added the start_after parameter.

New in version 2.0.

See also

The MongoDB documentation on

changeStreams

HOST

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

PORT

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-‘ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4

address

(host, port) of the current standalone, primary, or mongos, or None.

Accessing address raises InvalidOperation if the client is load-balancing among mongoses, since there is no single address. Use nodes instead.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

New in version 3.0.

arbiters

Arbiters in the replica set.

A sequence of (host, port) pairs. Empty if this client is not connected to a replica set, there are no arbiters, or this client was created without the replicaSet option.

close

Cleanup client resources and disconnect from MongoDB.

On MongoDB >= 3.6, end all server sessions created by this client by sending one or more endSessions commands.

Close all sockets in the connection pools and stop the monitor threads. If this instance is used again it will be automatically re-opened and the threads restarted unless auto encryption is enabled. A client enabled with auto encryption cannot be used again after being closed; any attempt will raise InvalidOperation.

Changed in version 3.6: End all server sessions created by this client.

codec_options

Read only access to the CodecOptions of this instance.

event_listeners

The event listeners registered for this client.

See monitoring for details.

is_mongos

If this client is connected to mongos. If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available..

is_primary

If this client is connected to a server that can accept writes.

True if the current server is a standalone, mongos, or the primary of a replica set. If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

local_threshold_ms

The local threshold for this instance.

max_bson_size

The largest BSON object the connected server accepts in bytes.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

max_idle_time_ms

The maximum number of milliseconds that a connection can remain idle in the pool before being removed and replaced. Defaults to None (no limit).

max_message_size

The largest message the connected server accepts in bytes.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

max_pool_size

The maximum allowable number of concurrent connections to each connected server. Requests to a server will block if there are maxPoolSize outstanding connections to the requested server. Defaults to 100. Cannot be 0.

When a server’s pool has reached max_pool_size, operations for that server block waiting for a socket to be returned to the pool. If waitQueueTimeoutMS is set, a blocked operation will raise ConnectionFailure after a timeout. By default waitQueueTimeoutMS is not set.

max_write_batch_size

The maxWriteBatchSize reported by the server.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

Returns a default value when connected to server versions prior to MongoDB 2.6.

min_pool_size

The minimum required number of concurrent connections that the pool will maintain to each connected server. Default is 0.

nodes

Set of all currently connected servers.

Warning

When connected to a replica set the value of nodes can change over time as MongoClient’s view of the replica set changes. nodes can also be an empty set when MongoClient is first instantiated and hasn’t yet connected to any servers, or a network partition causes it to lose connection to all servers.

primary

The (host, port) of the current primary of the replica set.

Returns None if this client is not connected to a replica set, there is no primary, or this client was created without the replicaSet option.

New in version 3.0: MongoClient gained this property in version 3.0 when MongoReplicaSetClient’s functionality was merged in.

read_concern

Read only access to the ReadConcern of this instance.

New in version 3.2.

read_preference

Read only access to the read preference of this instance.

Changed in version 3.0: The read_preference attribute is now read only.

retry_reads

If this instance should retry supported write operations.

retry_writes

If this instance should retry supported write operations.

secondaries

The secondary members known to this client.

A sequence of (host, port) pairs. Empty if this client is not connected to a replica set, there are no visible secondaries, or this client was created without the replicaSet option.

New in version 3.0: MongoClient gained this property in version 3.0 when MongoReplicaSetClient’s functionality was merged in.

server_selection_timeout

The server selection timeout for this instance in seconds.

write_concern

Read only access to the WriteConcern of this instance.

Changed in version 3.0: The write_concern attribute is now read only.

MotorClientSession – Sequence of operations

class motor.motor_tornado.MotorClientSession(delegate, motor_client)
coroutine abort_transaction()

Abort a multi-statement transaction.

coroutine commit_transaction()

Commit a multi-statement transaction.

coroutine end_session()

Finish this session. If a transaction has started, abort it.

It is an error to use the session after the session has ended.

start_transaction(read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None)

Start a multi-statement transaction.

Takes the same arguments as TransactionOptions.

Best used in a context manager block:

# Use "await" for start_session, but not for start_transaction.
async with await client.start_session() as s:
    async with s.start_transaction():
        await collection.delete_one({'x': 1}, session=s)
        await collection.insert_one({'x': 2}, session=s)
with_transaction(coro, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None)

Executes an awaitable in a transaction.

This method starts a transaction on this session, awaits coro once, and then commits the transaction. For example:

async def coro(session):
    orders = session.client.db.orders
    inventory = session.client.db.inventory
    inserted_id = await orders.insert_one(
        {"sku": "abc123", "qty": 100}, session=session)
    await inventory.update_one(
        {"sku": "abc123", "qty": {"$gte": 100}},
        {"$inc": {"qty": -100}}, session=session)
    return inserted_id

async with await client.start_session() as session:
    inserted_id = await session.with_transaction(coro)

To pass arbitrary arguments to the coro, wrap it with a lambda like this:

async def coro(session, custom_arg, custom_kwarg=None):
    # Transaction operations...

async with await client.start_session() as session:
    await session.with_transaction(
        lambda s: coro(s, "custom_arg", custom_kwarg=1))

In the event of an exception, with_transaction may retry the commit or the entire transaction, therefore coro may be awaited multiple times by a single call to with_transaction. Developers should be mindful of this possiblity when writing a coro that modifies application state or has any other side-effects. Note that even when the coro is invoked multiple times, with_transaction ensures that the transaction will be committed at-most-once on the server.

The coro should not attempt to start new transactions, but should simply run operations meant to be contained within a transaction. The coro should also not commit the transaction; this is handled automatically by with_transaction. If the coro does commit or abort the transaction without error, however, with_transaction will return without taking further action.

When coro raises an exception, with_transaction automatically aborts the current transaction. When coro or commit_transaction() raises an exception that includes the "TransientTransactionError" error label, with_transaction starts a new transaction and re-executes the coro.

When commit_transaction() raises an exception with the "UnknownTransactionCommitResult" error label, with_transaction retries the commit until the result of the transaction is known.

This method will cease retrying after 120 seconds has elapsed. This timeout is not configurable and any exception raised by the coro or by ClientSession.commit_transaction() after the timeout is reached will be re-raised. Applications that desire a different timeout duration should not use this method.

Parameters:
  • coro: The coroutine to run inside a transaction. The coroutine must accept a single argument, this session. Note, under certain error conditions the coroutine may be run multiple times.
  • read_concern (optional): The ReadConcern to use for this transaction.
  • write_concern (optional): The WriteConcern to use for this transaction.
  • read_preference (optional): The read preference to use for this transaction. If None (the default) the read_preference of this Database is used. See read_preferences for options.
Returns:

The return value of the coro.

New in version 2.1.

advance_cluster_time

Update the cluster time for this session.

Parameters:
  • cluster_time: The cluster_time from another ClientSession instance.
advance_operation_time

Update the operation time for this session.

Parameters:
  • operation_time: The operation_time from another ClientSession instance.
client

The MotorClient this session was created from.

cluster_time

The cluster time returned by the last operation executed in this session.

has_ended

True if this session is finished.

in_transaction

True if this session has an active multi-statement transaction.

New in version 3.10.

operation_time

The operation time returned by the last operation executed in this session.

options

The SessionOptions this session was created with.

session_id

A BSON document, the opaque server session identifier.

MotorDatabase

class motor.motor_tornado.MotorDatabase(client, name, **kwargs)
db[collection_name] || db.collection_name

Get the collection_name MotorCollection of MotorDatabase db.

Raises InvalidName if an invalid collection name is used.

aggregate(pipeline, **kwargs)

Execute an aggregation pipeline on this database.

Introduced in MongoDB 3.6.

The aggregation can be run on a secondary if the client is connected to a replica set and its read_preference is not PRIMARY. The aggregate() method obeys the read_preference of this MotorDatabase, except when $out or $merge are used, in which case PRIMARY is used.

All optional aggregate command parameters should be passed as keyword arguments to this method. Valid options include, but are not limited to:

  • allowDiskUse (bool): Enables writing to temporary files. When set to True, aggregation stages can write data to the _tmp subdirectory of the –dbpath directory. The default is False.
  • maxTimeMS (int): The maximum amount of time to allow the operation to run in milliseconds.
  • batchSize (int): The maximum number of documents to return per batch. Ignored if the connected mongod or mongos does not support returning aggregate results using a cursor.
  • collation (optional): An instance of Collation.

Returns a MotorCommandCursor that can be iterated like a cursor from find():

async def f():
    # Lists all operations currently running on the server.
    pipeline = [{"$currentOp": {}}]
    async for operation in client.admin.aggregate(pipeline):
        print(operation)

Note

This method does not support the ‘explain’ option. Please use MotorDatabase.command() instead.

Note

The MotorDatabase.write_concern of this database is automatically applied to this operation.

New in version 2.1.

coroutine command(command, value=1, check=True, allowable_errors=None, read_preference=None, codec_options=CodecOptions(document_class=dict, tz_aware=False, uuid_representation=UuidRepresentation.PYTHON_LEGACY, unicode_decode_error_handler='strict', tzinfo=None, type_registry=TypeRegistry(type_codecs=[], fallback_encoder=None)), session=None, **kwargs)

Issue a MongoDB command.

Send command command to the database and return the response. If command is a string then the command {command: value} will be sent. Otherwise, command must be a dict and will be sent as-is.

Additional keyword arguments are added to the final command document before it is sent.

For example, a command like {buildinfo: 1} can be sent using:

result = await db.command("buildinfo")

For a command where the value matters, like {collstats: collection_name} we can do:

result = await db.command("collstats", collection_name)

For commands that take additional arguments we can use kwargs. So {filemd5: object_id, root: file_root} becomes:

result = await db.command("filemd5", object_id, root=file_root)
Parameters:
  • command: document representing the command to be issued, or the name of the command (for simple commands only).

    Note

    the order of keys in the command document is significant (the “verb” must come first), so commands which require multiple keys (e.g. findandmodify) should use an instance of SON or a string and kwargs instead of a Python dict.

  • value (optional): value to use for the command verb when command is passed as a string

  • check (optional): check the response for errors, raising OperationFailure if there are any

  • allowable_errors: if check is True, error messages in this list will be ignored by error-checking

  • read_preference: The read preference for this operation. See read_preferences for options.

  • session (optional): a ClientSession, created with start_session().

  • **kwargs (optional): additional keyword arguments will be added to the command document before it is sent

Changed in version 1.2: Added session parameter.

See also

The MongoDB documentation on

commands

coroutine create_collection(name, codec_options=None, read_preference=None, write_concern=None, read_concern=None, session=None, **kwargs)

Create a new Collection in this database.

Normally collection creation is automatic. This method should only be used to specify options on creation. CollectionInvalid will be raised if the collection already exists.

Options should be passed as keyword arguments to this method. Supported options vary with MongoDB release. Some examples include:

  • “size”: desired initial size for the collection (in bytes). For capped collections this size is the max size of the collection.
  • “capped”: if True, this is a capped collection
  • “max”: maximum number of objects if capped (optional)

See the MongoDB documentation for a full list of supported options by server version.

Parameters:
  • name: the name of the collection to create
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Database is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Database is used.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Database is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Database is used.
  • collation (optional): An instance of Collation.
  • session (optional): a ClientSession.
  • **kwargs (optional): additional keyword arguments will be passed as options for the create collection command
coroutine current_op(include_all=False, session=None)

DEPRECATED: Get information on operations currently running.

Starting with MongoDB 3.6 this helper is obsolete. The functionality provided by this helper is available in MongoDB 3.6+ using the $currentOp aggregation pipeline stage, which can be used with aggregate(). Note that, while this helper can only return a single document limited to a 16MB result, aggregate() returns a cursor avoiding that limitation.

Users of MongoDB versions older than 3.6 can use the currentOp command directly:

# MongoDB 3.2 and 3.4
await client.admin.command("currentOp")

Or query the “inprog” virtual collection:

# MongoDB 2.6 and 3.0
await client.admin["$cmd.sys.inprog"].find_one()
Parameters:
  • include_all (optional): if True also list currently idle operations in the result
  • session (optional): a ClientSession, created with start_session().

Changed in version 2.1: Deprecated, use aggregate() instead.

Changed in version 1.2: Added session parameter.

coroutine dereference(dbref, session=None, **kwargs)

Dereference a DBRef, getting the document it points to.

Raises TypeError if dbref is not an instance of DBRef. Returns a document, or None if the reference does not point to a valid document. Raises ValueError if dbref has a database specified that is different from the current database.

Parameters:
  • dbref: the reference
  • session (optional): a ClientSession.
  • **kwargs (optional): any additional keyword arguments are the same as the arguments to find().
coroutine drop_collection(name_or_collection, session=None)

Drop a collection.

Parameters:
  • name_or_collection: the name of a collection to drop or the collection object itself
  • session (optional): a ClientSession.

Note

The write_concern of this database is automatically applied to this operation when using MongoDB >= 3.4.

get_collection(name, codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a Collection with the given name and options.

Useful for creating a Collection with different codec options, read preference, and/or write concern from this Database.

>>> db.read_preference
Primary()
>>> coll1 = db.test
>>> coll1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> coll2 = db.get_collection(
...     'test', read_preference=ReadPreference.SECONDARY)
>>> coll2.read_preference
Secondary(tag_sets=None)
Parameters:
  • name: The name of the collection - a string.
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Database is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Database is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Database is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Database is used.
coroutine list_collection_names(session=None, filter=None, **kwargs)

Get a list of all the collection names in this database.

For example, to list all non-system collections:

filter = {"name": {"$regex": r"^(?!system\.)"}}
names = await db.list_collection_names(filter=filter)
Parameters:
  • session (optional): a ClientSession, created with start_session().
  • filter (optional): A query document to filter the list of collections returned from the listCollections command.
  • **kwargs (optional): Optional parameters of the listCollections command can be passed as keyword arguments to this method. The supported options differ by server version.

Changed in version 2.1: Added the filter and **kwargs parameters.

New in version 1.2.

coroutine list_collections(session=None, filter=None, **kwargs)

Get a cursor over the collectons of this database.

Parameters:
  • session (optional): a ClientSession.
  • filter (optional): A query document to filter the list of collections returned from the listCollections command.
  • **kwargs (optional): Optional parameters of the listCollections command can be passed as keyword arguments to this method. The supported options differ by server version.
Returns:

An instance of CommandCursor.

coroutine profiling_info(session=None)

Returns a list containing current profiling information.

Parameters:

See also

The MongoDB documentation on

profiling

coroutine profiling_level(session=None)

Get the database’s current profiling level.

Returns one of (OFF, SLOW_ONLY, ALL).

Parameters:

See also

The MongoDB documentation on

profiling

coroutine set_profiling_level(level, slow_ms=None, session=None)

Set the database’s profiling level.

Parameters:
  • level: Specifies a profiling level, see list of possible values below.
  • slow_ms: Optionally modify the threshold for the profile to consider a query or operation. Even if the profiler is off queries slower than the slow_ms level will get written to the logs.
  • session (optional): a ClientSession.

Possible level values:

Level Setting
OFF Off. No profiling.
SLOW_ONLY On. Only includes slow operations.
ALL On. Includes all operations.

Raises ValueError if level is not one of (OFF, SLOW_ONLY, ALL).

See also

The MongoDB documentation on

profiling

coroutine validate_collection(name_or_collection, scandata=False, full=False, session=None, background=None)

Validate a collection.

Returns a dict of validation info. Raises CollectionInvalid if validation fails.

See also the MongoDB documentation on the validate command.

Parameters:
  • name_or_collection: A Collection object or the name of a collection to validate.
  • scandata: Do extra checks beyond checking the overall structure of the collection.
  • full: Have the server do a more thorough scan of the collection. Use with scandata for a thorough scan of the structure of the collection and the individual documents.
  • session (optional): a ClientSession.
  • background (optional): A boolean flag that determines whether the command runs in the background. Requires MongoDB 4.4+.
watch(pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None)

Watch changes on this database.

Returns a MotorChangeStream cursor which iterates over changes on this database. Introduced in MongoDB 4.0.

See the documentation for MotorCollection.watch() for more details and examples.

Parameters:
  • pipeline (optional): A list of aggregation pipeline stages to append to an initial $changeStream stage. Not all pipeline stages are valid after a $changeStream stage, see the MongoDB documentation on change streams for the supported stages.
  • full_document (optional): The fullDocument option to pass to the $changeStream stage. Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  • resume_after (optional): A resume token. If provided, the change stream will start returning changes that occur directly after the operation specified in the resume token. A resume token is the _id value of a change document.
  • max_await_time_ms (optional): The maximum time in milliseconds for the server to wait for changes before responding to a getMore operation.
  • batch_size (optional): The maximum number of documents to return per batch.
  • collation (optional): The Collation to use for the aggregation.
  • start_at_operation_time (optional): If provided, the resulting change stream will only return changes that occurred at or after the specified Timestamp. Requires MongoDB >= 4.0.
  • session (optional): a ClientSession.
  • start_after (optional): The same as resume_after except that start_after can resume notifications after an invalidate event. This option and resume_after are mutually exclusive.
Returns:

A MotorChangeStream.

Changed in version 2.1: Added the start_after parameter.

New in version 2.0.

See also

The MongoDB documentation on

changeStreams

with_options(codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a clone of this database changing the specified settings.

>>> db1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> db2 = db1.with_options(read_preference=ReadPreference.SECONDARY)
>>> db1.read_preference
Primary()
>>> db2.read_preference
Secondary(tag_sets=None)
Parameters:
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Collection is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Collection is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Collection is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Collection is used.

New in version 3.8.

client

This MotorDatabase’s MotorClient.

codec_options

Read only access to the CodecOptions of this instance.

incoming_copying_manipulators

DEPRECATED: All incoming SON copying manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

incoming_manipulators

DEPRECATED: All incoming SON manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

name

The name of this Database.

outgoing_copying_manipulators

DEPRECATED: All outgoing SON copying manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

outgoing_manipulators

DEPRECATED: All outgoing SON manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

read_concern

Read only access to the ReadConcern of this instance.

New in version 3.2.

read_preference

Read only access to the read preference of this instance.

Changed in version 3.0: The read_preference attribute is now read only.

write_concern

Read only access to the WriteConcern of this instance.

Changed in version 3.0: The write_concern attribute is now read only.

MotorCollection

class motor.motor_tornado.MotorCollection(database, name, codec_options=None, read_preference=None, write_concern=None, read_concern=None, _delegate=None)
c[name] || c.name

Get the name sub-collection of MotorCollection c.

Raises InvalidName if an invalid collection name is used.

database

The MotorDatabase that this MotorCollection is a part of.

coroutine create_index(self, keys, **kwargs)

Creates an index on this collection.

Takes either a single key or a list of (key, direction) pairs. The key(s) must be an instance of basestring (str in python 3), and the direction(s) must be one of (ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, HASHED, TEXT).

To create a single key ascending index on the key 'mike' we just use a string argument:

await my_collection.create_index("mike")

For a compound index on 'mike' descending and 'eliot' ascending we need to use a list of tuples:

await my_collection.create_index([("mike", pymongo.DESCENDING),
                                  ("eliot", pymongo.ASCENDING)])

All optional index creation parameters should be passed as keyword arguments to this method. For example:

await my_collection.create_index([("mike", pymongo.DESCENDING)],
                                 background=True)

Valid options include, but are not limited to:

  • name: custom name to use for this index - if none is given, a name will be generated.
  • unique: if True creates a uniqueness constraint on the index.
  • background: if True this index should be created in the background.
  • sparse: if True, omit from the index any documents that lack the indexed field.
  • bucketSize: for use with geoHaystack indexes. Number of documents to group together within a certain proximity to a given longitude and latitude.
  • min: minimum value for keys in a GEO2D index.
  • max: maximum value for keys in a GEO2D index.
  • expireAfterSeconds: <int> Used to create an expiring (TTL) collection. MongoDB will automatically delete documents from this collection after <int> seconds. The indexed field must be a UTC datetime or the data will not expire.
  • partialFilterExpression: A document that specifies a filter for a partial index.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.

See the MongoDB documentation for a full list of supported options by server version.

Warning

dropDups is not supported by MongoDB 3.0 or newer. The option is silently ignored by the server and unique index builds using the option will fail if a duplicate value is detected.

Note

partialFilterExpression requires server version >= 3.2

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

Parameters:
  • keys: a single key or a list of (key, direction) pairs specifying the index to create
  • **kwargs (optional): any additional index creation options (see the above list) should be passed as keyword arguments

Returns a Future.

See also

The MongoDB documentation on

indexes

coroutine inline_map_reduce(self, map, reduce, full_response=False, **kwargs)

Perform an inline map/reduce operation on this collection.

Perform the map/reduce operation on the server in RAM. A result collection is not created. The result set is returned as a list of documents.

If full_response is False (default) returns the result documents in a list. Otherwise, returns the full response from the server to the map reduce command.

The inline_map_reduce() method obeys the read_preference of this Collection.

Parameters:
  • map: map function (as a JavaScript string)

  • reduce: reduce function (as a JavaScript string)

  • full_response (optional): if True, return full response to this command - otherwise just return the result collection

  • **kwargs (optional): additional arguments to the map reduce command may be passed as keyword arguments to this helper method, e.g.:

    await db.test.inline_map_reduce(map, reduce, limit=2)
    

Returns a Future.

See also

The MongoDB documentation on

mapreduce

aggregate(pipeline, **kwargs)

Execute an aggregation pipeline on this collection.

The aggregation can be run on a secondary if the client is connected to a replica set and its read_preference is not PRIMARY.

Parameters:
  • pipeline: a single command or list of aggregation commands
  • session (optional): a ClientSession, created with start_session().
  • **kwargs: send arbitrary parameters to the aggregate command

Returns a MotorCommandCursor that can be iterated like a cursor from find():

async def f():
    pipeline = [{'$project': {'name': {'$toUpper': '$name'}}}]
    async for doc in collection.aggregate(pipeline):
        print(doc)

MotorCommandCursor does not allow the explain option. To explain MongoDB’s query plan for the aggregation, use MotorDatabase.command():

async def f():
    plan = await db.command(
        'aggregate', 'COLLECTION-NAME',
        pipeline=[{'$project': {'x': 1}}],
        explain=True)

    print(plan)

Changed in version 2.1: This collection’s read concern is now applied to pipelines containing the $out stage when connected to MongoDB >= 4.2.

Changed in version 1.0: aggregate() now always returns a cursor.

Changed in version 0.5: aggregate() now returns a cursor by default, and the cursor is returned immediately without an await. See aggregation changes in Motor 0.5.

Changed in version 0.2: Added cursor support.

aggregate_raw_batches(pipeline, **kwargs)

Perform an aggregation and retrieve batches of raw BSON.

Similar to the aggregate() method but returns each batch as bytes.

This example demonstrates how to work with raw batches, but in practice raw batches should be passed to an external library that can decode BSON into another data type, rather than used with PyMongo’s bson module.

async def get_raw():
    cursor = db.test.aggregate_raw_batches()
    async for batch in cursor:
        print(bson.decode_all(batch))

Note that aggregate_raw_batches does not support sessions.

New in version 2.0.

coroutine bulk_write(requests, ordered=True, bypass_document_validation=False, session=None)

Send a batch of write operations to the server.

Requests are passed as a list of write operation instances imported from pymongo: InsertOne, UpdateOne, UpdateMany, ReplaceOne, DeleteOne, or DeleteMany).

For example, say we have these documents:

{'x': 1, '_id': ObjectId('54f62e60fba5226811f634ef')}
{'x': 1, '_id': ObjectId('54f62e60fba5226811f634f0')}

We can insert a document, delete one, and replace one like so:

# DeleteMany, UpdateOne, and UpdateMany are also available.
from pymongo import InsertOne, DeleteOne, ReplaceOne

async def modify_data():
    requests = [InsertOne({'y': 1}), DeleteOne({'x': 1}),
                ReplaceOne({'w': 1}, {'z': 1}, upsert=True)]
    result = await db.test.bulk_write(requests)

    print("inserted %d, deleted %d, modified %d" % (
        result.inserted_count, result.deleted_count, result.modified_count))

    print("upserted_ids: %s" % result.upserted_ids)

    print("collection:")
    async for doc in db.test.find():
        print(doc)

This will print something like:

inserted 1, deleted 1, modified 0
upserted_ids: {2: ObjectId('54f62ee28891e756a6e1abd5')}

collection:
{'x': 1, '_id': ObjectId('54f62e60fba5226811f634f0')}
{'y': 1, '_id': ObjectId('54f62ee2fba5226811f634f1')}
{'z': 1, '_id': ObjectId('54f62ee28891e756a6e1abd5')}
Parameters:
  • requests: A list of write operations (see examples above).
  • ordered (optional): If True (the default) requests will be performed on the server serially, in the order provided. If an error occurs all remaining operations are aborted. If False requests will be performed on the server in arbitrary order, possibly in parallel, and all operations will be attempted.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • session (optional): a ClientSession, created with start_session().
Returns:

An instance of BulkWriteResult.

Note

bypass_document_validation requires server version >= 3.2

Changed in version 1.2: Added session parameter.

coroutine count_documents(filter, session=None, **kwargs)

Count the number of documents in this collection.

Note

For a fast count of the total documents in a collection see estimated_document_count().

The count_documents() method is supported in a transaction.

All optional parameters should be passed as keyword arguments to this method. Valid options include:

  • skip (int): The number of matching documents to skip before returning results.
  • limit (int): The maximum number of documents to count. Must be a positive integer. If not provided, no limit is imposed.
  • maxTimeMS (int): The maximum amount of time to allow this operation to run, in milliseconds.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (string or list of tuples): The index to use. Specify either the index name as a string or the index specification as a list of tuples (e.g. [(‘a’, pymongo.ASCENDING), (‘b’, pymongo.ASCENDING)]). This option is only supported on MongoDB 3.6 and above.

The count_documents() method obeys the read_preference of this Collection.

Note

When migrating from count() to count_documents() the following query operators must be replaced:

Operator Replacement
$where $expr
$near $geoWithin with $center
$nearSphere $geoWithin with $centerSphere

$expr requires MongoDB 3.6+

Parameters:
  • filter (required): A query document that selects which documents to count in the collection. Can be an empty document to count all documents.
  • session (optional): a ClientSession.
  • **kwargs (optional): See list of options above.
coroutine create_indexes(indexes, session=None, **kwargs)

Create one or more indexes on this collection:

from pymongo import IndexModel, ASCENDING, DESCENDING

async def create_two_indexes():
    index1 = IndexModel([("hello", DESCENDING),
                         ("world", ASCENDING)], name="hello_world")
    index2 = IndexModel([("goodbye", DESCENDING)])
    print(await db.test.create_indexes([index1, index2]))

This prints:

['hello_world', 'goodbye_-1']
Parameters:
  • indexes: A list of IndexModel instances.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): optional arguments to the createIndexes command (like maxTimeMS) can be passed as keyword arguments.

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

Changed in version 1.2: Added session parameter.

coroutine delete_many(filter, collation=None, hint=None, session=None)

Delete one or more documents matching the filter.

If we have a collection with 3 documents like {'x': 1}, then:

async def clear_collection():
    result = await db.test.delete_many({'x': 1})
    print(result.deleted_count)

This deletes all matching documents and prints “3”.

Parameters:
  • filter: A query that matches the documents to delete.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (optional): An index used to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine delete_one(filter, collation=None, hint=None, session=None)

Delete a single document matching the filter.

If we have a collection with 3 documents like {'x': 1}, then:

async def clear_collection():
    result = await db.test.delete_one({'x': 1})
    print(result.deleted_count)

This deletes one matching document and prints “1”.

Parameters:
  • filter: A query that matches the document to delete.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (optional): An index used to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine distinct(key, filter=None, session=None, **kwargs)

Get a list of distinct values for key among all documents in this collection.

Raises TypeError if key is not an instance of basestring (str in python 3).

All optional distinct parameters should be passed as keyword arguments to this method. Valid options include:

  • maxTimeMS (int): The maximum amount of time to allow the count command to run, in milliseconds.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.

The distinct() method obeys the read_preference of this Collection.

Parameters:
  • key: name of the field for which we want to get the distinct values
  • filter (optional): A query document that specifies the documents from which to retrieve the distinct values.
  • session (optional): a ClientSession.
  • **kwargs (optional): See list of options above.
coroutine drop(session=None)

Alias for drop_collection.

The following two calls are equivalent:

await db.foo.drop()
await db.drop_collection("foo")
coroutine drop_index(index_or_name, session=None, **kwargs)

Drops the specified index on this collection.

Can be used on non-existant collections or collections with no indexes. Raises OperationFailure on an error (e.g. trying to drop an index that does not exist). index_or_name can be either an index name (as returned by create_index), or an index specifier (as passed to create_index). An index specifier should be a list of (key, direction) pairs. Raises TypeError if index is not an instance of (str, unicode, list).

Warning

if a custom name was used on index creation (by passing the name parameter to create_index() or ensure_index()) the index must be dropped by name.

Parameters:
  • index_or_name: index (or name of index) to drop
  • session (optional): a ClientSession.
  • **kwargs (optional): optional arguments to the createIndexes command (like maxTimeMS) can be passed as keyword arguments.

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

coroutine drop_indexes(session=None, **kwargs)

Drops all indexes on this collection.

Can be used on non-existant collections or collections with no indexes. Raises OperationFailure on an error.

Parameters:
  • session (optional): a ClientSession.
  • **kwargs (optional): optional arguments to the createIndexes command (like maxTimeMS) can be passed as keyword arguments.

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

coroutine estimated_document_count(**kwargs)

Get an estimate of the number of documents in this collection using collection metadata.

The estimated_document_count() method is not supported in a transaction.

All optional parameters should be passed as keyword arguments to this method. Valid options include:

  • maxTimeMS (int): The maximum amount of time to allow this operation to run, in milliseconds.
Parameters:
  • **kwargs (optional): See list of options above.
find(*args, **kwargs)

Create a MotorCursor. Same parameters as for PyMongo’s find().

Note that find does not require an await expression, because find merely creates a MotorCursor without performing any operations on the server. MotorCursor methods such as to_list() perform actual operations.

coroutine find_one(filter=None, *args, **kwargs)

Get a single document from the database.

All arguments to find() are also valid arguments for find_one(), although any limit argument will be ignored. Returns a single document, or None if no matching document is found.

The find_one() method obeys the read_preference of this Motor collection instance.

Parameters:
  • filter (optional): a dictionary specifying the query to be performed OR any other type to be used as the value for a query for "_id".
  • *args (optional): any additional positional arguments are the same as the arguments to find().
  • **kwargs (optional): any additional keyword arguments are the same as the arguments to find().
  • max_time_ms (optional): a value for max_time_ms may be specified as part of **kwargs, e.g.:
await collection.find_one(max_time_ms=100)

Changed in version 1.2: Added session parameter.

coroutine find_one_and_delete(filter, projection=None, sort=None, hint=None, session=None, **kwargs)

Finds a single document and deletes it, returning the document.

If we have a collection with 2 documents like {'x': 1}, then this code retrieves and deletes one of them:

async def delete_one_document():
    print(await db.test.count_documents({'x': 1}))
    doc = await db.test.find_one_and_delete({'x': 1})
    print(doc)
    print(await db.test.count_documents({'x': 1}))

This outputs something like:

2
{'x': 1, '_id': ObjectId('54f4e12bfba5220aa4d6dee8')}
1

If multiple documents match filter, a sort can be applied. Say we have 3 documents like:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

This code retrieves and deletes the document with the largest _id:

async def delete_with_largest_id():
    doc = await db.test.find_one_and_delete(
        {'x': 1}, sort=[('_id', pymongo.DESCENDING)])

This deletes one document and prints it:

{'x': 1, '_id': 2}

The projection option can be used to limit the fields returned:

async def delete_and_return_x():
    db.test.find_one_and_delete({'x': 1}, projection={'_id': False})

This prints:

{'x': 1}
Parameters:
  • filter: A query that matches the document to delete.
  • projection (optional): a list of field names that should be returned in the result document or a mapping specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a mapping to exclude fields from the result (e.g. projection={‘_id’: False}).
  • sort (optional): a list of (key, direction) pairs specifying the sort order for the query. If multiple documents match the query, they are sorted and the first is deleted.
  • hint (optional): An index used to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): additional command arguments can be passed as keyword arguments (for example maxTimeMS can be used with recent server versions).

This command uses the WriteConcern of this Collection when connected to MongoDB >= 3.2. Note that using an elevated write concern with this command may be slower compared to using the default write concern.

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine find_one_and_replace(filter, replacement, projection=None, sort=None, upsert=False, return_document=False, hint=None, session=None, **kwargs)

Finds a single document and replaces it, returning either the original or the replaced document.

The find_one_and_replace() method differs from find_one_and_update() by replacing the document matched by filter, rather than modifying the existing document.

Say we have 3 documents like:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

Replace one of them like so:

async def replace_one_doc():
    original_doc = await db.test.find_one_and_replace({'x': 1}, {'y': 1})
    print("original: %s" % original_doc)
    print("collection:")
    async for doc in db.test.find():
        print(doc)

This will print:

original: {'x': 1, '_id': 0}
collection:
{'y': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}
Parameters:
  • filter: A query that matches the document to replace.
  • replacement: The replacement document.
  • projection (optional): A list of field names that should be returned in the result document or a mapping specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a mapping to exclude fields from the result (e.g. projection={‘_id’: False}).
  • sort (optional): a list of (key, direction) pairs specifying the sort order for the query. If multiple documents match the query, they are sorted and the first is replaced.
  • upsert (optional): When True, inserts a new document if no document matches the query. Defaults to False.
  • return_document: If ReturnDocument.BEFORE (the default), returns the original document before it was replaced, or None if no document matches. If ReturnDocument.AFTER, returns the replaced or inserted document.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): additional command arguments can be passed as keyword arguments (for example maxTimeMS can be used with recent server versions).

This command uses the WriteConcern of this Collection when connected to MongoDB >= 3.2. Note that using an elevated write concern with this command may be slower compared to using the default write concern.

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine find_one_and_update(filter, update, projection=None, sort=None, upsert=False, return_document=False, array_filters=None, hint=None, session=None, **kwargs)

Finds a single document and updates it, returning either the original or the updated document. By default find_one_and_update() returns the original version of the document before the update was applied:

async def set_done():
    print(await db.test.find_one_and_update(
        {'_id': 665}, {'$inc': {'count': 1}, '$set': {'done': True}}))

This outputs:

{'_id': 665, 'done': False, 'count': 25}}

To return the updated version of the document instead, use the return_document option.

from pymongo import ReturnDocument

async def increment_by_userid():
    print(await db.example.find_one_and_update(
        {'_id': 'userid'},
        {'$inc': {'seq': 1}},
        return_document=ReturnDocument.AFTER))

This prints:

{'_id': 'userid', 'seq': 1}

You can limit the fields returned with the projection option.

async def increment_by_userid():
    print(await db.example.find_one_and_update(
        {'_id': 'userid'},
        {'$inc': {'seq': 1}},
        projection={'seq': True, '_id': False},
        return_document=ReturnDocument.AFTER))

This results in:

{'seq': 2}

The upsert option can be used to create the document if it doesn’t already exist.

async def increment_by_userid():
    print(await db.example.find_one_and_update(
        {'_id': 'userid'},
        {'$inc': {'seq': 1}},
        projection={'seq': True, '_id': False},
        upsert=True,
        return_document=ReturnDocument.AFTER))

The result:

{'seq': 1}

If multiple documents match filter, a sort can be applied. Say we have these two documents:

{'_id': 665, 'done': True, 'result': {'count': 26}}
{'_id': 701, 'done': True, 'result': {'count': 17}}

Then to update the one with the great _id:

async def set_done():
    print(await db.test.find_one_and_update(
        {'done': True},
        {'$set': {'final': True}},
        sort=[('_id', pymongo.DESCENDING)]))

This would print:

{'_id': 701, 'done': True, 'result': {'count': 17}}
Parameters:
  • filter: A query that matches the document to update.
  • update: The update operations to apply.
  • projection (optional): A list of field names that should be returned in the result document or a mapping specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a dict to exclude fields from the result (e.g. projection={‘_id’: False}).
  • sort (optional): a list of (key, direction) pairs specifying the sort order for the query. If multiple documents match the query, they are sorted and the first is updated.
  • upsert (optional): When True, inserts a new document if no document matches the query. Defaults to False.
  • return_document: If ReturnDocument.BEFORE (the default), returns the original document before it was updated, or None if no document matches. If ReturnDocument.AFTER, returns the updated or inserted document.
  • array_filters (optional): A list of filters specifying which array elements an update should apply. Requires MongoDB 3.6+.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): additional command arguments can be passed as keyword arguments (for example maxTimeMS can be used with recent server versions).

This command uses the WriteConcern of this Collection when connected to MongoDB >= 3.2. Note that using an elevated write concern with this command may be slower compared to using the default write concern.

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added array_filters and session parameters.

find_raw_batches(*args, **kwargs)

Query the database and retrieve batches of raw BSON.

Similar to the find() method but returns each batch as bytes.

This example demonstrates how to work with raw batches, but in practice raw batches should be passed to an external library that can decode BSON into another data type, rather than used with PyMongo’s bson module.

async def get_raw():
    cursor = db.test.find_raw_batches()
    async for batch in cursor:
        print(bson.decode_all(batch))

Note that find_raw_batches does not support sessions.

New in version 2.0.

coroutine index_information(session=None)

Get information on this collection’s indexes.

Returns a dictionary where the keys are index names (as returned by create_index()) and the values are dictionaries containing information about each index. The dictionary is guaranteed to contain at least a single key, "key" which is a list of (key, direction) pairs specifying the index (as passed to create_index()). It will also contain any other metadata about the indexes, except for the "ns" and "name" keys, which are cleaned. For example:

async def create_x_index():
    print(await db.test.create_index("x", unique=True))
    print(await db.test.index_information())

This prints:

'x_1'
{'_id_': {'key': [('_id', 1)]},
 'x_1': {'unique': True, 'key': [('x', 1)]}}

Changed in version 1.2: Added session parameter.

coroutine insert_many(documents, ordered=True, bypass_document_validation=False, session=None)

Insert an iterable of documents.

async def insert_2_docs():
    result = db.test.insert_many([{'x': i} for i in range(2)])
    result.inserted_ids

This prints something like:

[ObjectId('54f113fffba522406c9cc20e'), ObjectId('54f113fffba522406c9cc20f')]
Parameters:
  • documents: A iterable of documents to insert.
  • ordered (optional): If True (the default) documents will be inserted on the server serially, in the order provided. If an error occurs all remaining inserts are aborted. If False, documents will be inserted on the server in arbitrary order, possibly in parallel, and all document inserts will be attempted.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • session (optional): a ClientSession, created with start_session().
Returns:

An instance of InsertManyResult.

Note

bypass_document_validation requires server version >= 3.2

Changed in version 1.2: Added session parameter.

coroutine insert_one(document, bypass_document_validation=False, session=None)

Insert a single document.

async def insert_x():
    result = await db.test.insert_one({'x': 1})
    print(result.inserted_id)

This code outputs the new document’s _id:

ObjectId('54f112defba522406c9cc208')
Parameters:
  • document: The document to insert. Must be a mutable mapping type. If the document does not have an _id field one will be added automatically.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 1.2: Added session parameter.

list_indexes(session=None)

Get a cursor over the index documents for this collection.

async def print_indexes():
    async for index in db.test.list_indexes():
        print(index)

If the only index is the default index on _id, this might print:

SON([('v', 1), ('key', SON([('_id', 1)])), ('name', '_id_')])
coroutine map_reduce(map, reduce, out, full_response=False, session=None, **kwargs)

Perform a map/reduce operation on this collection.

If full_response is False (default) returns a MotorCollection instance containing the results of the operation. Otherwise, returns the full response from the server to the map reduce command.

Parameters:
  • map: map function (as a JavaScript string)

  • reduce: reduce function (as a JavaScript string)

  • out: output collection name or out object (dict). See the map reduce command documentation for available options. Note: out options are order sensitive. SON can be used to specify multiple options. e.g. SON([(‘replace’, <collection name>), (‘db’, <database name>)])

  • full_response (optional): if True, return full response to this command - otherwise just return the result collection

  • session (optional): a ClientSession, created with start_session().

  • **kwargs (optional): additional arguments to the map reduce command may be passed as keyword arguments to this helper method, e.g.:

    result = await db.test.map_reduce(map, reduce, "myresults", limit=2)
    

Returns a Future.

Note

The map_reduce() method does not obey the read_preference of this MotorCollection. To run mapReduce on a secondary use the inline_map_reduce() method instead.

See also

The MongoDB documentation on

mapreduce

Changed in version 1.2: Added session parameter.

coroutine options(session=None)

Get the options set on this collection.

Returns a dictionary of options and their values - see create_collection() for more information on the possible options. Returns an empty dictionary if the collection has not been created yet.

Parameters:
coroutine reindex(session=None, **kwargs)

DEPRECATED: Rebuild all indexes on this collection.

Deprecated. Use command() to run the reIndex command directly instead:

await db.command({"reIndex": "<collection_name>"})

Note

Starting in MongoDB 4.6, the reIndex command can only be run when connected to a standalone mongod.

Parameters:
  • session (optional): a MotorClientSession.
  • **kwargs (optional): optional arguments to the reIndex command (like maxTimeMS) can be passed as keyword arguments.

Warning

reindex blocks all other operations (indexes are built in the foreground) and will be slow for large collections.

Changed in version 2.2: Deprecated.

coroutine rename(new_name, session=None, **kwargs)

Rename this collection.

If operating in auth mode, client must be authorized as an admin to perform this operation. Raises TypeError if new_name is not an instance of basestring (str in python 3). Raises InvalidName if new_name is not a valid collection name.

Parameters:
  • new_name: new name for this collection
  • session (optional): a ClientSession.
  • **kwargs (optional): additional arguments to the rename command may be passed as keyword arguments to this helper method (i.e. dropTarget=True)

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

coroutine replace_one(filter, replacement, upsert=False, bypass_document_validation=False, collation=None, hint=None, session=None)

Replace a single document matching the filter.

Say our collection has one document:

{'x': 1, '_id': ObjectId('54f4c5befba5220aa4d6dee7')}

Then to replace it with another:

async def_replace_x_with_y():
    result = await db.test.replace_one({'x': 1}, {'y': 1})
    print('matched %d, modified %d' %
        (result.matched_count, result.modified_count))

    print('collection:')
    async for doc in db.test.find():
        print(doc)

This prints:

matched 1, modified 1
collection:
{'y': 1, '_id': ObjectId('54f4c5befba5220aa4d6dee7')}

The upsert option can be used to insert a new document if a matching document does not exist:

async def_replace_or_upsert():
    result = await db.test.replace_one({'x': 1}, {'x': 1}, True)
    print('matched %d, modified %d, upserted_id %r' %
        (result.matched_count, result.modified_count, result.upserted_id))

    print('collection:')
    async for doc in db.test.find():
        print(doc)

This prints:

matched 1, modified 1, upserted_id ObjectId('54f11e5c8891e756a6e1abd4')
collection:
{'y': 1, '_id': ObjectId('54f4c5befba5220aa4d6dee7')}
Parameters:
  • filter: A query that matches the document to replace.
  • replacement: The new document.
  • upsert (optional): If True, perform an insert if no documents match the filter.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.2 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine update_many(filter, update, upsert=False, array_filters=None, bypass_document_validation=False, collation=None, hint=None, session=None)

Update one or more documents that match the filter.

Say our collection has 3 documents:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

We can add 3 to each “x” field:

async def add_3_to_x():
  result = await db.test.update_many({'x': 1}, {'$inc': {'x': 3}})
  print('matched %d, modified %d' % 
        (result.matched_count, result.modified_count))

  print('collection:')
  async for doc in db.test.find():
      print(doc)

This prints:

matched 3, modified 3
collection:
{'x': 4, '_id': 0}
{'x': 4, '_id': 1}
{'x': 4, '_id': 2}
Parameters:
  • filter: A query that matches the documents to update.
  • update: The modifications to apply.
  • upsert (optional): If True, perform an insert if no documents match the filter.
  • bypass_document_validation (optional): If True, allows the write to opt-out of document level validation. Default is False.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • array_filters (optional): A list of filters specifying which array elements an update should apply. Requires MongoDB 3.6+.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.2 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added array_filters and session parameters.

coroutine update_one(filter, update, upsert=False, bypass_document_validation=False, collation=None, array_filters=None, hint=None, session=None)

Update a single document matching the filter.

Say our collection has 3 documents:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

We can add 3 to the “x” field of one of the documents:

async def add_3_to_x():
  result = await db.test.update_one({'x': 1}, {'$inc': {'x': 3}})
  print('matched %d, modified %d' %
        (result.matched_count, result.modified_count))

  print('collection:')
  async for doc in db.test.find():
      print(doc)

This prints:

matched 1, modified 1
collection:
{'x': 4, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}
Parameters:
  • filter: A query that matches the document to update.
  • update: The modifications to apply.
  • upsert (optional): If True, perform an insert if no documents match the filter.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • array_filters (optional): A list of filters specifying which array elements an update should apply. Requires MongoDB 3.6+.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.2 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added array_filters and session parameters.

watch(pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None)

Watch changes on this collection.

Performs an aggregation with an implicit initial $changeStream stage and returns a MotorChangeStream cursor which iterates over changes on this collection.

Introduced in MongoDB 3.6.

A change stream continues waiting indefinitely for matching change events. Code like the following allows a program to cancel the change stream and exit.

change_stream = None

async def watch_collection():
    global change_stream

    # Using the change stream in an "async with" block
    # ensures it is canceled promptly if your code breaks
    # from the loop or throws an exception.
    async with db.collection.watch() as change_stream:
        async for change in change_stream:
            print(change)

# Tornado
from tornado.ioloop import IOLoop

def main():
    loop = IOLoop.current()
    # Start watching collection for changes.
    loop.add_callback(watch_collection)
    try:
        loop.start()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()

# asyncio
from asyncio import get_event_loop

def main():
    loop = get_event_loop()
    task = loop.create_task(watch_collection)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()

        # Prevent "Task was destroyed but it is pending!"
        loop.run_until_complete(task)

The MotorChangeStream async iterable blocks until the next change document is returned or an error is raised. If the next() method encounters a network error when retrieving a batch from the server, it will automatically attempt to recreate the cursor such that no change events are missed. Any error encountered during the resume attempt indicates there may be an outage and will be raised.

try:
    pipeline = [{'$match': {'operationType': 'insert'}}]
    async with db.collection.watch(pipeline) as stream:
        async for change in stream:
            print(change)
except pymongo.errors.PyMongoError:
    # The ChangeStream encountered an unrecoverable error or the
    # resume attempt failed to recreate the cursor.
    logging.error('...')

For a precise description of the resume process see the change streams specification.

Parameters:
  • pipeline (optional): A list of aggregation pipeline stages to append to an initial $changeStream stage. Not all pipeline stages are valid after a $changeStream stage, see the MongoDB documentation on change streams for the supported stages.
  • full_document (optional): The fullDocument option to pass to the $changeStream stage. Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  • resume_after (optional): A resume token. If provided, the change stream will start returning changes that occur directly after the operation specified in the resume token. A resume token is the _id value of a change document.
  • max_await_time_ms (optional): The maximum time in milliseconds for the server to wait for changes before responding to a getMore operation.
  • batch_size (optional): The maximum number of documents to return per batch.
  • collation (optional): The Collation to use for the aggregation.
  • session (optional): a ClientSession.
  • start_after (optional): The same as resume_after except that start_after can resume notifications after an invalidate event. This option and resume_after are mutually exclusive.
Returns:

A MotorChangeStream.

See the Tornado Change Stream Example.

Changed in version 2.1: Added the start_after parameter.

New in version 1.2.

See also

The MongoDB documentation on

changeStreams

with_options(codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a clone of this collection changing the specified settings.

>>> coll1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> coll2 = coll1.with_options(read_preference=ReadPreference.SECONDARY)
>>> coll1.read_preference
Primary()
>>> coll2.read_preference
Secondary(tag_sets=None)
Parameters:
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Collection is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Collection is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Collection is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Collection is used.
codec_options

Read only access to the CodecOptions of this instance.

full_name

The full name of this Collection.

The full name is of the form database_name.collection_name.

name

The name of this Collection.

read_concern

Read only access to the ReadConcern of this instance.

New in version 3.2.

read_preference

Read only access to the read preference of this instance.

Changed in version 3.0: The read_preference attribute is now read only.

write_concern

Read only access to the WriteConcern of this instance.

Changed in version 3.0: The write_concern attribute is now read only.

MotorChangeStream

class motor.motor_tornado.MotorChangeStream(target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after)
close()

Close this change stream.

Stops any “async for” loops using this change stream.

next()

Advance the cursor.

This method blocks until the next change document is returned or an unrecoverable error is raised. This method is used when iterating over all changes in the cursor. For example:

async def watch_collection():
    resume_token = None
    pipeline = [{'$match': {'operationType': 'insert'}}]
    try:
        async with db.collection.watch(pipeline) as stream:
            async for insert_change in stream:
                print(insert_change)
                resume_token = stream.resume_token
    except pymongo.errors.PyMongoError:
        # The ChangeStream encountered an unrecoverable error or the
        # resume attempt failed to recreate the cursor.
        if resume_token is None:
            # There is no usable resume token because there was a
            # failure during ChangeStream initialization.
            logging.error('...')
        else:
            # Use the interrupted ChangeStream's resume token to
            # create a new ChangeStream. The new stream will
            # continue from the last seen insert change without
            # missing any events.
            async with db.collection.watch(
                    pipeline, resume_after=resume_token) as stream:
                async for insert_change in stream:
                    print(insert_change)

Raises StopAsyncIteration if this change stream is closed.

In addition to using an “async for” loop as shown in the code example above, you can also iterate the change stream by calling await change_stream.next() repeatedly.

try_next()

Advance the cursor without blocking indefinitely.

This method returns the next change document without waiting indefinitely for the next change. If no changes are available, it returns None. For example:

while change_stream.alive:
    change = await change_stream.try_next()
    # Note that the ChangeStream's resume token may be updated
    # even when no changes are returned.
    print("Current resume token: %r" % (change_stream.resume_token,))
    if change is not None:
        print("Change document: %r" % (change,))
        continue
    # We end up here when there are no recent changes.
    # Sleep for a while before trying again to avoid flooding
    # the server with getMore requests when no changes are
    # available.
    await asyncio.sleep(10)

If no change document is cached locally then this method runs a single getMore command. If the getMore yields any documents, the next document is returned, otherwise, if the getMore returns no documents (because there have been no changes) then None is returned.

Returns:The next change document or None when no document is available after running a single getMore or when the cursor is closed.

New in version 2.1.

alive

Does this cursor have the potential to return more data?

Note

Even if alive is True, next() can raise StopAsyncIteration and try_next() can return None.

resume_token

The cached resume token that will be used to resume after the most recently returned change.

New in version 3.9.

MotorCursor

class motor.motor_tornado.MotorCursor(cursor, collection)

Don’t construct a cursor yourself, but acquire one from methods like MotorCollection.find() or MotorCollection.aggregate().

Note

There is no need to manually close cursors; they are closed by the server after being fully iterated with to_list(), each(), or async for, or automatically closed by the client when the MotorCursor is cleaned up by the garbage collector.

add_option(mask)

Set arbitrary query flags using a bitmask.

To set the tailable flag: cursor.add_option(2)

allow_disk_use(allow_disk_use)

Specifies whether MongoDB can use temporary disk files while processing a blocking sort operation.

Raises TypeError if allow_disk_use is not a boolean.

Note

allow_disk_use requires server version >= 4.4

Parameters:
  • allow_disk_use: if True, MongoDB may use temporary disk files to store data exceeding the system memory limit while processing a blocking sort operation.
clone()

Get a clone of this cursor.

close()

Explicitly kill this cursor on the server.

Call like:

await cursor.close()
collation(collation)

Adds a Collation to this query.

This option is only supported on MongoDB 3.4 and above.

Raises TypeError if collation is not an instance of Collation or a dict. Raises InvalidOperation if this Cursor has already been used. Only the last collation applied to this cursor has any effect.

Parameters:
comment(comment)

Adds a ‘comment’ to the cursor.

http://docs.mongodb.org/manual/reference/operator/comment/

Parameters:
  • comment: A string to attach to the query to help interpret and trace the operation in the server logs and in profile data.
coroutine distinct(key)

Get a list of distinct values for key among all documents in the result set of this query.

Raises TypeError if key is not an instance of basestring (str in python 3).

The distinct() method obeys the read_preference of the Collection instance on which find() was called.

Parameters:
  • key: name of key for which we want to get the distinct values
each(callback)

Iterates over all the documents for this cursor.

each() returns immediately, and callback is executed asynchronously for each document. callback is passed (None, None) when iteration is complete.

Cancel iteration early by returning False from the callback. (Only False cancels iteration: returning None or 0 does not.)

>>> def each(result, error):
...     if error:
...         raise error
...     elif result:
...         sys.stdout.write(str(result['_id']) + ', ')
...     else:
...         # Iteration complete
...         IOLoop.current().stop()
...         print('done')
...
>>> cursor = collection.find().sort([('_id', 1)])
>>> cursor.each(callback=each)
>>> IOLoop.current().start()
0, 1, 2, 3, 4, done

Note

Unlike other Motor methods, each requires a callback and does not return a Future, so it cannot be used in a coroutine. async for and to_list() are much easier to use.

Parameters:
  • callback: function taking (document, error)
coroutine explain()

Returns an explain plan record for this cursor.

Note

Starting with MongoDB 3.2 explain() uses the default verbosity mode of the explain command, allPlansExecution. To use a different verbosity use command() to run the explain command directly.

See also

The MongoDB documentation on

explain

hint(index)

Adds a ‘hint’, telling Mongo the proper index to use for the query.

Judicious use of hints can greatly improve query performance. When doing a query on multiple fields (at least one of which is indexed) pass the indexed field as a hint to the query. Raises OperationFailure if the provided hint requires an index that does not exist on this collection, and raises InvalidOperation if this cursor has already been used.

index should be an index as passed to create_index() (e.g. [('field', ASCENDING)]) or the name of the index. If index is None any existing hint for this query is cleared. The last hint applied to this cursor takes precedence over all others.

Parameters:
  • index: index to hint on (as an index specifier)
limit(limit)

Limits the number of results to be returned by this cursor.

Raises TypeError if limit is not an integer. Raises InvalidOperation if this Cursor has already been used. The last limit applied to this cursor takes precedence. A limit of 0 is equivalent to no limit.

Parameters:
  • limit: the number of results to return

See also

The MongoDB documentation on

limit

max(spec)

Adds max operator that specifies upper bound for specific index.

When using max, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the exclusive upper bound for all keys of a specific index in order.
max_await_time_ms(max_await_time_ms)

Specifies a time limit for a getMore operation on a TAILABLE_AWAIT cursor. For all other types of cursor max_await_time_ms is ignored.

Raises TypeError if max_await_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Note

max_await_time_ms requires server version >= 3.2

Parameters:
  • max_await_time_ms: the time limit after which the operation is aborted
max_scan(max_scan)

DEPRECATED - Limit the number of documents to scan when performing the query.

Raises InvalidOperation if this cursor has already been used. Only the last max_scan() applied to this cursor has any effect.

Parameters:
  • max_scan: the maximum number of documents to scan
max_time_ms(max_time_ms)

Specifies a time limit for a query operation. If the specified time is exceeded, the operation will be aborted and ExecutionTimeout is raised. If max_time_ms is None no limit is applied.

Raises TypeError if max_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Parameters:
  • max_time_ms: the time limit after which the operation is aborted
min(spec)

Adds min operator that specifies lower bound for specific index.

When using min, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the inclusive lower bound for all keys of a specific index in order.
next()

Advance the cursor.

New in version 2.2.

next_object()

DEPRECATED - Get a document from the most recently fetched batch, or None. See fetch_next.

The next_object() method is deprecated and will be removed in Motor 3.0. Use async for to elegantly iterate over MotorCursor objects instead.

Changed in version 2.2: Deprecated.

remove_option(mask)

Unset arbitrary query flags using a bitmask.

To unset the tailable flag: cursor.remove_option(2)

rewind()

Rewind this cursor to its unevaluated state.

skip(skip)

Skips the first skip results of this cursor.

Raises TypeError if skip is not an integer. Raises ValueError if skip is less than 0. Raises InvalidOperation if this Cursor has already been used. The last skip applied to this cursor takes precedence.

Parameters:
  • skip: the number of results to skip
sort(key_or_list, direction=None)

Sorts this cursor’s results.

Pass a field name and a direction, either ASCENDING or DESCENDING:

>>> async def f():
...     cursor = collection.find().sort('_id', pymongo.DESCENDING)
...     docs = await cursor.to_list(None)
...     print([d['_id'] for d in docs])
...
>>> IOLoop.current().run_sync(f)
[4, 3, 2, 1, 0]

To sort by multiple fields, pass a list of (key, direction) pairs:

>>> async def f():
...     cursor = collection.find().sort([
...         ('field1', pymongo.ASCENDING),
...         ('field2', pymongo.DESCENDING)])
...
...     docs = await cursor.to_list(None)
...     print([(d['field1'], d['field2']) for d in docs])
...
>>> IOLoop.current().run_sync(f)
[(0, 4), (0, 2), (0, 0), (1, 3), (1, 1)]

Text search results can be sorted by relevance:

>>> async def f():
...     cursor = collection.find({
...         '$text': {'$search': 'some words'}},
...         {'score': {'$meta': 'textScore'}})
...
...     # Sort by 'score' field.
...     cursor.sort([('score', {'$meta': 'textScore'})])
...     async for doc in cursor:
...         print('%.1f %s' % (doc['score'], doc['field']))
...
>>> IOLoop.current().run_sync(f)
1.5 words about some words
1.0 words

Raises InvalidOperation if this cursor has already been used. Only the last sort() applied to this cursor has any effect.

Parameters:
  • key_or_list: a single key or a list of (key, direction) pairs specifying the keys to sort on
  • direction (optional): only used if key_or_list is a single key, if not given ASCENDING is assumed
coroutine to_list(length)

Get a list of documents.

>>> from motor.motor_tornado import MotorClient
>>> collection = MotorClient().test.test_collection
>>>
>>> async def f():
...     cursor = collection.find().sort([('_id', 1)])
...     docs = await cursor.to_list(length=2)
...     while docs:
...         print(docs)
...         docs = await cursor.to_list(length=2)
...
...     print('done')
...
>>> ioloop.IOLoop.current().run_sync(f)
[{'_id': 0}, {'_id': 1}]
[{'_id': 2}, {'_id': 3}]
done
Parameters:
  • length: maximum number of documents to return for this call, or None

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: callback must be passed as a keyword argument, like to_list(10, callback=callback), and the length parameter is no longer optional.

where(code)

Adds a $where clause to this query.

The code argument must be an instance of str Code containing a JavaScript expression. This expression will be evaluated for each document scanned. Only those documents for which the expression evaluates to true will be returned as results. The keyword this refers to the object currently being scanned. For example:

# Find all documents where field "a" is less than "b" plus "c".
async for doc in db.test.find().where('this.a < (this.b + this.c)'):
    print(doc)

Raises TypeError if code is not an instance of str. Raises InvalidOperation if this MotorCursor has already been used. Only the last call to where() applied to a MotorCursor has any effect.

Note

MongoDB 4.4 drops support for Code with scope variables. Consider using $expr instead.

Parameters:
  • code: JavaScript expression to use as a filter
address

The (host, port) of the server used, or None.

Changed in version 3.0: Renamed from “conn_id”.

alive

Does this cursor have the potential to return more data?

This is mostly useful with tailable cursors since they will stop iterating even though they may return more results in the future.

With regular cursors, simply use a for loop instead of alive:

for doc in collection.find():
    print(doc)

Note

Even if alive is True, next() can raise StopIteration. alive can also be True while iterating a cursor from a failed server. In this case alive will return False after next() fails to retrieve the next batch of results from the server.

cursor_id

Returns the id of the cursor

Useful if you need to manage cursor ids and want to handle killing cursors manually using kill_cursors()

New in version 2.2.

fetch_next

DEPRECATED - A Future used with gen.coroutine to asynchronously retrieve the next document in the result set, fetching a batch of documents from the server if necessary. Resolves to False if there are no more documents, otherwise next_object() is guaranteed to return a document:

Attention

The fetch_next property is deprecated and will be removed in Motor 3.0. Use async for to iterate elegantly and efficiently over MotorCursor objects instead.:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     async for doc in collection.find():
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

While it appears that fetch_next retrieves each document from the server individually, the cursor actually fetches documents efficiently in large batches. Example usage:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     cursor = collection.find().sort([('_id', 1)])
...     while (await cursor.fetch_next):
...         doc = cursor.next_object()
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

Changed in version 2.2: Deprecated.

session

The cursor’s ClientSession, or None.

New in version 3.6.

MotorCommandCursor

class motor.motor_tornado.MotorCommandCursor(cursor, collection)

Don’t construct a cursor yourself, but acquire one from methods like MotorCollection.find() or MotorCollection.aggregate().

Note

There is no need to manually close cursors; they are closed by the server after being fully iterated with to_list(), each(), or async for, or automatically closed by the client when the MotorCursor is cleaned up by the garbage collector.

close()

Explicitly kill this cursor on the server.

Call like:

await cursor.close()
each(callback)

Iterates over all the documents for this cursor.

each() returns immediately, and callback is executed asynchronously for each document. callback is passed (None, None) when iteration is complete.

Cancel iteration early by returning False from the callback. (Only False cancels iteration: returning None or 0 does not.)

>>> def each(result, error):
...     if error:
...         raise error
...     elif result:
...         sys.stdout.write(str(result['_id']) + ', ')
...     else:
...         # Iteration complete
...         IOLoop.current().stop()
...         print('done')
...
>>> cursor = collection.find().sort([('_id', 1)])
>>> cursor.each(callback=each)
>>> IOLoop.current().start()
0, 1, 2, 3, 4, done

Note

Unlike other Motor methods, each requires a callback and does not return a Future, so it cannot be used in a coroutine. async for and to_list() are much easier to use.

Parameters:
  • callback: function taking (document, error)
next()

Advance the cursor.

New in version 2.2.

next_object()

DEPRECATED - Get a document from the most recently fetched batch, or None. See fetch_next.

The next_object() method is deprecated and will be removed in Motor 3.0. Use async for to elegantly iterate over MotorCursor objects instead.

Changed in version 2.2: Deprecated.

coroutine to_list(length)

Get a list of documents.

>>> from motor.motor_tornado import MotorClient
>>> collection = MotorClient().test.test_collection
>>>
>>> async def f():
...     cursor = collection.find().sort([('_id', 1)])
...     docs = await cursor.to_list(length=2)
...     while docs:
...         print(docs)
...         docs = await cursor.to_list(length=2)
...
...     print('done')
...
>>> ioloop.IOLoop.current().run_sync(f)
[{'_id': 0}, {'_id': 1}]
[{'_id': 2}, {'_id': 3}]
done
Parameters:
  • length: maximum number of documents to return for this call, or None

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: callback must be passed as a keyword argument, like to_list(10, callback=callback), and the length parameter is no longer optional.

address

The (host, port) of the server used, or None.

New in version 3.0.

alive

Does this cursor have the potential to return more data?

Even if alive is True, next() can raise StopIteration. Best to use a for loop:

for doc in collection.aggregate(pipeline):
    print(doc)

Note

alive can be True while iterating a cursor from a failed server. In this case alive will return False after next() fails to retrieve the next batch of results from the server.

cursor_id

Returns the id of the cursor.

fetch_next

DEPRECATED - A Future used with gen.coroutine to asynchronously retrieve the next document in the result set, fetching a batch of documents from the server if necessary. Resolves to False if there are no more documents, otherwise next_object() is guaranteed to return a document:

Attention

The fetch_next property is deprecated and will be removed in Motor 3.0. Use async for to iterate elegantly and efficiently over MotorCursor objects instead.:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     async for doc in collection.find():
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

While it appears that fetch_next retrieves each document from the server individually, the cursor actually fetches documents efficiently in large batches. Example usage:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     cursor = collection.find().sort([('_id', 1)])
...     while (await cursor.fetch_next):
...         doc = cursor.next_object()
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

Changed in version 2.2: Deprecated.

session

The cursor’s ClientSession, or None.

New in version 3.6.

Motor GridFS Classes

Store blobs of data in GridFS.

class motor.motor_tornado.MotorGridFSBucket

Create a new instance of MotorGridFSBucket.

Raises TypeError if database is not an instance of MotorDatabase.

Raises ConfigurationError if write_concern is not acknowledged.

Parameters:
  • database: database to use.
  • bucket_name (optional): The name of the bucket. Defaults to ‘fs’.
  • chunk_size_bytes (optional): The chunk size in bytes. Defaults to 255KB.
  • write_concern (optional): The WriteConcern to use. If None (the default) db.write_concern is used.
  • read_preference (optional): The read preference to use. If None (the default) db.read_preference is used.

See also

The MongoDB documentation on

gridfs

coroutine delete(self, file_id))

Delete a file’s metadata and data chunks from a GridFS bucket:

async def delete():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    # Get _id of file to delete
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    await fs.delete(file_id)

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be deleted.

Returns a Future.

coroutine download_to_stream(self, file_id, destination))

Downloads the contents of the stored file specified by file_id and writes the contents to destination:

async def download():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    # Get _id of file to read
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    # Get file to write to
    file = open('myfile','wb+')
    await fs.download_to_stream(file_id, file)
    file.seek(0)
    contents = file.read()

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be downloaded.
  • destination: a file-like object implementing write().

Returns a Future.

coroutine download_to_stream_by_name(self, filename, destination, revision=-1)

Write the contents of filename (with optional revision) to destination.

For example:

async def download_by_name():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    # Get file to write to
    file = open('myfile','wb')
    await fs.download_to_stream_by_name("test_file", file)

Raises NoFile if no such version of that file exists.

Raises ValueError if filename is not a string.

Parameters:
  • filename: The name of the file to read from.
  • destination: A file-like object that implements write().
  • revision (optional): Which revision (documents with the same filename and different uploadDate) of the file to retrieve. Defaults to -1 (the most recent revision).
Note:

Revision numbers are defined as follows:

  • 0 = the original stored file
  • 1 = the first revision
  • 2 = the second revision
  • etc…
  • -2 = the second most recent revision
  • -1 = the most recent revision
find(self, *args, **kwargs)

Find and return the files collection documents that match filter.

Returns a cursor that iterates across files matching arbitrary queries on the files collection. Can be combined with other modifiers for additional control.

For example:

async def find():
    cursor = fs.find({"filename": "lisa.txt"},
                     no_cursor_timeout=True)

    async for grid_data in cursor:
        data = grid_data.read()

iterates through all versions of “lisa.txt” stored in GridFS. Setting no_cursor_timeout may be important to prevent the cursor from timing out during long multi-file processing work.

As another example, the call:

most_recent_three = fs.find().sort("uploadDate", -1).limit(3)

returns a cursor to the three most recently uploaded files in GridFS.

Follows a similar interface to find() in MotorCollection.

Parameters:
  • filter: Search query.
  • batch_size (optional): The number of documents to return per batch.
  • limit (optional): The maximum number of documents to return.
  • no_cursor_timeout (optional): The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to True prevent that.
  • skip (optional): The number of documents to skip before returning.
  • sort (optional): The order by which to sort results. Defaults to None.

Returns a Future.

coroutine open_download_stream(self, file_id)

Opens a stream to read the contents of the stored file specified by file_id:

async def download_stream():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    # get _id of file to read.
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    grid_out = await fs.open_download_stream(file_id)
    contents = await grid_out.read()

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be downloaded.

Returns a Future that resolves to a MotorGridOut.

coroutine open_download_stream_by_name(self, filename, revision=-1)

Opens a stream to read the contents of filename and optional revision:

async def download_by_name():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    # get _id of file to read.
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    grid_out = await fs.open_download_stream_by_name(file_id)
    contents = await grid_out.read()

Raises NoFile if no such version of that file exists.

Raises ValueError filename is not a string.

Parameters:
  • filename: The name of the file to read from.
  • revision (optional): Which revision (documents with the same filename and different uploadDate) of the file to retrieve. Defaults to -1 (the most recent revision).

Returns a Future that resolves to a MotorGridOut.

Note:

Revision numbers are defined as follows:

  • 0 = the original stored file
  • 1 = the first revision
  • 2 = the second revision
  • etc…
  • -2 = the second most recent revision
  • -1 = the most recent revision
open_upload_stream(self, filename, chunk_size_bytes=None, metadata=None)

Opens a stream for writing.

Specify the filename, and add any additional information in the metadata field of the file document or modify the chunk size:

async def upload():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    grid_in, file_id = fs.open_upload_stream(
        "test_file", chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

    await grid_in.write(b"data I want to store!")
    await grid_in.close()  # uploaded on close

Returns an instance of MotorGridIn.

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Using the “async with” statement calls close() automatically:

async def upload():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    async with await fs.new_file() as gridin:
        await gridin.write(b'First part\n')
        await gridin.write(b'Second part')

    # gridin is now closed automatically.
Parameters:
  • filename: The name of the file to upload.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes in MotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.
open_upload_stream_with_id(self, file_id, filename, chunk_size_bytes=None, metadata=None)

Opens a stream for writing.

Specify the filed_id and filename, and add any additional information in the metadata field of the file document, or modify the chunk size:

async def upload():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    grid_in, file_id = fs.open_upload_stream_with_id(
        ObjectId(),
        "test_file",
        chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

    await grid_in.write(b"data I want to store!")
    await grid_in.close()  # uploaded on close

Returns an instance of MotorGridIn.

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Parameters:
  • file_id: The id to use for this file. The id must not have already been used for another file.
  • filename: The name of the file to upload.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes in MotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.
coroutine rename(self, file_id, new_filename))

Renames the stored file with the specified file_id.

For example:

async def rename():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    # get _id of file to read.
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")

    await fs.rename(file_id, "new_test_name")

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be renamed.
  • new_filename: The new name of the file.

Returns a Future.

coroutine upload_from_stream(self, filename, source, chunk_size_bytes=None, metadata=None))

Uploads a user file to a GridFS bucket.

Reads the contents of the user file from source and uploads it to the file filename. Source can be a string or file-like object. For example:

async def upload_from_stream():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    file_id = await fs.upload_from_stream(
        "test_file",
        b"data I want to store!",
        chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Parameters:
  • filename: The name of the file to upload.
  • source: The source stream of the content to be uploaded. Must be a file-like object that implements read() or a string.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes of MotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.

Returns a Future that resolves to the _id of the uploaded file.

coroutine upload_from_stream_with_id(self, file_id, filename, source, chunk_size_bytes=None, metadata=None))

Uploads a user file to a GridFS bucket with a custom file id.

Reads the contents of the user file from source and uploads it to the file filename. Source can be a string or file-like object. For example:

async def upload_from_stream_with_id():
    my_db = MotorClient().test
    fs = MotorGridFSBucket(my_db)
    file_id = await fs.upload_from_stream_with_id(
        ObjectId(),
        "test_file",
        b"data I want to store!",
        chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Parameters:
  • file_id: The id to use for this file. The id must not have already been used for another file.
  • filename: The name of the file to upload.
  • source: The source stream of the content to be uploaded. Must be a file-like object that implements read() or a string.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes of MotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.

Returns a Future.

class motor.motor_tornado.MotorGridIn(root_collection, delegate=None, session=None, disable_md5=False, **kwargs)

Class to write data to GridFS. Application developers should not generally need to instantiate this class - see open_upload_stream().

Any of the file level options specified in the GridFS Spec may be passed as keyword arguments. Any additional keyword arguments will be set as additional fields on the file document. Valid keyword arguments include:

  • "_id": unique ID for this file (default: ObjectId) - this "_id" must not have already been used for another file
  • "filename": human name for the file
  • "contentType" or "content_type": valid mime-type for the file
  • "chunkSize" or "chunk_size": size of each of the chunks, in bytes (default: 256 kb)
  • "encoding": encoding used for this file. In Python 2, any unicode that is written to the file will be converted to a str. In Python 3, any str that is written to the file will be converted to bytes.
Parameters:
  • root_collection: root collection to write to
  • session (optional): a ClientSession to use for all commands
  • disable_md5 (optional): When True, an MD5 checksum will not be computed for the uploaded file. Useful in environments where MD5 cannot be used for regulatory or other reasons. Defaults to False.
  • **kwargs (optional): file level options (see above)

Changed in version 0.2: open method removed, no longer needed.

coroutine abort()

Remove all chunks/files that may have been uploaded and close.

coroutine close()

Flush the file and close it.

A closed file cannot be written any more. Calling close() more than once is allowed.

coroutine set(name, value)

Set an arbitrary metadata attribute on the file. Stores value on the server as a key-value pair within the file document once the file is closed. If the file is already closed, calling set() will immediately update the file document on the server.

Metadata set on the file appears as attributes on a MotorGridOut object created from the file.

Parameters:
  • name: Name of the attribute, will be stored as a key in the file document on the server
  • value: Value of the attribute
coroutine write(data)

Write data to the file. There is no return value.

data can be either a string of bytes or a file-like object (implementing read()). If the file has an encoding attribute, data can also be a unicode (str in python 3) instance, which will be encoded as encoding before being written.

Due to buffering, the data may not actually be written to the database until the close() method is called. Raises ValueError if this file is already closed. Raises TypeError if data is not an instance of str (bytes in python 3), a file-like object, or an instance of unicode (str in python 3). Unicode data is only allowed if the file has an encoding attribute.

Parameters:
  • data: string of bytes or file-like object to be written to the file
coroutine writelines(sequence)

Write a sequence of strings to the file.

Does not add seperators.

chunk_size

Chunk size for this file.

This attribute is read-only.

closed

Is this file closed?

content_type

Mime-type for this file.

filename

Name of this file.

length

Length (in bytes) of this file.

This attribute is read-only and can only be read after close() has been called.

md5

MD5 of the contents of this file if an md5 sum was created.

This attribute is read-only and can only be read after close() has been called.

name

Alias for filename.

upload_date

Date that this file was uploaded.

This attribute is read-only and can only be read after close() has been called.

class motor.motor_tornado.MotorGridOut(root_collection, file_id=None, file_document=None, delegate=None, session=None)
coroutine open()

Retrieve this file’s attributes from the server.

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: MotorGridOut now opens itself on demand, calling open explicitly is rarely needed.

coroutine read(size=-1)

Read at most size bytes from the file (less if there isn’t enough data).

The bytes are returned as an instance of str (bytes in python 3). If size is negative or omitted all data is read.

Parameters:
  • size (optional): the number of bytes to read
coroutine readchunk()

Reads a chunk at a time. If the current position is within a chunk the remainder of the chunk is returned.

coroutine readline(size=-1)

Read one line or up to size bytes from the file.

Parameters:
  • size (optional): the maximum number of bytes to read
stream_to_handler(request_handler)

Write the contents of this file to a tornado.web.RequestHandler. This method calls flush() on the RequestHandler, so ensure all headers have already been set. For a more complete example see the implementation of GridFSHandler.

class FileHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @gen.coroutine
    def get(self, filename):
        db = self.settings['db']
        fs = await motor.MotorGridFSBucket(db())
        try:
            gridout = await fs.open_download_stream_by_name(filename)
        except gridfs.NoFile:
            raise tornado.web.HTTPError(404)

        self.set_header("Content-Type", gridout.content_type)
        self.set_header("Content-Length", gridout.length)
        await gridout.stream_to_handler(self)
        self.finish()

See also

Tornado RequestHandler

aliases

List of aliases for this file.

This attribute is read-only.

chunk_size

Chunk size for this file.

This attribute is read-only.

close

Make GridOut more generically file-like.

content_type

Mime-type for this file.

This attribute is read-only.

filename

Name of this file.

This attribute is read-only.

length

Length (in bytes) of this file.

This attribute is read-only.

md5

MD5 of the contents of this file if an md5 sum was created.

This attribute is read-only.

metadata

Metadata attached to this file.

This attribute is read-only.

name

Alias for filename.

This attribute is read-only.

seek

Set the current position of this file.

Parameters:
  • pos: the position (or offset if using relative positioning) to seek to
  • whence (optional): where to seek from. os.SEEK_SET (0) for absolute file positioning, os.SEEK_CUR (1) to seek relative to the current position, os.SEEK_END (2) to seek relative to the file’s end.
tell

Return the current position of this file.

upload_date

Date that this file was first uploaded.

This attribute is read-only.

class motor.motor_tornado.MotorGridOutCursor(cursor, collection)

Don’t construct a cursor yourself, but acquire one from methods like MotorCollection.find() or MotorCollection.aggregate().

Note

There is no need to manually close cursors; they are closed by the server after being fully iterated with to_list(), each(), or async for, or automatically closed by the client when the MotorCursor is cleaned up by the garbage collector.

allow_disk_use(allow_disk_use)

Specifies whether MongoDB can use temporary disk files while processing a blocking sort operation.

Raises TypeError if allow_disk_use is not a boolean.

Note

allow_disk_use requires server version >= 4.4

Parameters:
  • allow_disk_use: if True, MongoDB may use temporary disk files to store data exceeding the system memory limit while processing a blocking sort operation.
clone()

Get a clone of this cursor.

close()

Explicitly kill this cursor on the server.

Call like:

await cursor.close()
collation(collation)

Adds a Collation to this query.

This option is only supported on MongoDB 3.4 and above.

Raises TypeError if collation is not an instance of Collation or a dict. Raises InvalidOperation if this Cursor has already been used. Only the last collation applied to this cursor has any effect.

Parameters:
comment(comment)

Adds a ‘comment’ to the cursor.

http://docs.mongodb.org/manual/reference/operator/comment/

Parameters:
  • comment: A string to attach to the query to help interpret and trace the operation in the server logs and in profile data.
coroutine distinct(key)

Get a list of distinct values for key among all documents in the result set of this query.

Raises TypeError if key is not an instance of basestring (str in python 3).

The distinct() method obeys the read_preference of the Collection instance on which find() was called.

Parameters:
  • key: name of key for which we want to get the distinct values
each(callback)

Iterates over all the documents for this cursor.

each() returns immediately, and callback is executed asynchronously for each document. callback is passed (None, None) when iteration is complete.

Cancel iteration early by returning False from the callback. (Only False cancels iteration: returning None or 0 does not.)

>>> def each(result, error):
...     if error:
...         raise error
...     elif result:
...         sys.stdout.write(str(result['_id']) + ', ')
...     else:
...         # Iteration complete
...         IOLoop.current().stop()
...         print('done')
...
>>> cursor = collection.find().sort([('_id', 1)])
>>> cursor.each(callback=each)
>>> IOLoop.current().start()
0, 1, 2, 3, 4, done

Note

Unlike other Motor methods, each requires a callback and does not return a Future, so it cannot be used in a coroutine. async for and to_list() are much easier to use.

Parameters:
  • callback: function taking (document, error)
coroutine explain()

Returns an explain plan record for this cursor.

Note

Starting with MongoDB 3.2 explain() uses the default verbosity mode of the explain command, allPlansExecution. To use a different verbosity use command() to run the explain command directly.

See also

The MongoDB documentation on

explain

hint(index)

Adds a ‘hint’, telling Mongo the proper index to use for the query.

Judicious use of hints can greatly improve query performance. When doing a query on multiple fields (at least one of which is indexed) pass the indexed field as a hint to the query. Raises OperationFailure if the provided hint requires an index that does not exist on this collection, and raises InvalidOperation if this cursor has already been used.

index should be an index as passed to create_index() (e.g. [('field', ASCENDING)]) or the name of the index. If index is None any existing hint for this query is cleared. The last hint applied to this cursor takes precedence over all others.

Parameters:
  • index: index to hint on (as an index specifier)
limit(limit)

Limits the number of results to be returned by this cursor.

Raises TypeError if limit is not an integer. Raises InvalidOperation if this Cursor has already been used. The last limit applied to this cursor takes precedence. A limit of 0 is equivalent to no limit.

Parameters:
  • limit: the number of results to return

See also

The MongoDB documentation on

limit

max(spec)

Adds max operator that specifies upper bound for specific index.

When using max, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the exclusive upper bound for all keys of a specific index in order.
max_await_time_ms(max_await_time_ms)

Specifies a time limit for a getMore operation on a TAILABLE_AWAIT cursor. For all other types of cursor max_await_time_ms is ignored.

Raises TypeError if max_await_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Note

max_await_time_ms requires server version >= 3.2

Parameters:
  • max_await_time_ms: the time limit after which the operation is aborted
max_scan(max_scan)

DEPRECATED - Limit the number of documents to scan when performing the query.

Raises InvalidOperation if this cursor has already been used. Only the last max_scan() applied to this cursor has any effect.

Parameters:
  • max_scan: the maximum number of documents to scan
max_time_ms(max_time_ms)

Specifies a time limit for a query operation. If the specified time is exceeded, the operation will be aborted and ExecutionTimeout is raised. If max_time_ms is None no limit is applied.

Raises TypeError if max_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Parameters:
  • max_time_ms: the time limit after which the operation is aborted
min(spec)

Adds min operator that specifies lower bound for specific index.

When using min, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the inclusive lower bound for all keys of a specific index in order.
next()

Advance the cursor.

New in version 2.2.

next_object()

DEPRECATED - Get next GridOut object from cursor.

rewind()

Rewind this cursor to its unevaluated state.

skip(skip)

Skips the first skip results of this cursor.

Raises TypeError if skip is not an integer. Raises ValueError if skip is less than 0. Raises InvalidOperation if this Cursor has already been used. The last skip applied to this cursor takes precedence.

Parameters:
  • skip: the number of results to skip
sort(key_or_list, direction=None)

Sorts this cursor’s results.

Pass a field name and a direction, either ASCENDING or DESCENDING:

>>> async def f():
...     cursor = collection.find().sort('_id', pymongo.DESCENDING)
...     docs = await cursor.to_list(None)
...     print([d['_id'] for d in docs])
...
>>> IOLoop.current().run_sync(f)
[4, 3, 2, 1, 0]

To sort by multiple fields, pass a list of (key, direction) pairs:

>>> async def f():
...     cursor = collection.find().sort([
...         ('field1', pymongo.ASCENDING),
...         ('field2', pymongo.DESCENDING)])
...
...     docs = await cursor.to_list(None)
...     print([(d['field1'], d['field2']) for d in docs])
...
>>> IOLoop.current().run_sync(f)
[(0, 4), (0, 2), (0, 0), (1, 3), (1, 1)]

Text search results can be sorted by relevance:

>>> async def f():
...     cursor = collection.find({
...         '$text': {'$search': 'some words'}},
...         {'score': {'$meta': 'textScore'}})
...
...     # Sort by 'score' field.
...     cursor.sort([('score', {'$meta': 'textScore'})])
...     async for doc in cursor:
...         print('%.1f %s' % (doc['score'], doc['field']))
...
>>> IOLoop.current().run_sync(f)
1.5 words about some words
1.0 words

Raises InvalidOperation if this cursor has already been used. Only the last sort() applied to this cursor has any effect.

Parameters:
  • key_or_list: a single key or a list of (key, direction) pairs specifying the keys to sort on
  • direction (optional): only used if key_or_list is a single key, if not given ASCENDING is assumed
coroutine to_list(length)

Get a list of documents.

>>> from motor.motor_tornado import MotorClient
>>> collection = MotorClient().test.test_collection
>>>
>>> async def f():
...     cursor = collection.find().sort([('_id', 1)])
...     docs = await cursor.to_list(length=2)
...     while docs:
...         print(docs)
...         docs = await cursor.to_list(length=2)
...
...     print('done')
...
>>> ioloop.IOLoop.current().run_sync(f)
[{'_id': 0}, {'_id': 1}]
[{'_id': 2}, {'_id': 3}]
done
Parameters:
  • length: maximum number of documents to return for this call, or None

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: callback must be passed as a keyword argument, like to_list(10, callback=callback), and the length parameter is no longer optional.

where(code)

Adds a $where clause to this query.

The code argument must be an instance of str Code containing a JavaScript expression. This expression will be evaluated for each document scanned. Only those documents for which the expression evaluates to true will be returned as results. The keyword this refers to the object currently being scanned. For example:

# Find all documents where field "a" is less than "b" plus "c".
async for doc in db.test.find().where('this.a < (this.b + this.c)'):
    print(doc)

Raises TypeError if code is not an instance of str. Raises InvalidOperation if this MotorCursor has already been used. Only the last call to where() applied to a MotorCursor has any effect.

Note

MongoDB 4.4 drops support for Code with scope variables. Consider using $expr instead.

Parameters:
  • code: JavaScript expression to use as a filter
address

The (host, port) of the server used, or None.

Changed in version 3.0: Renamed from “conn_id”.

alive

Does this cursor have the potential to return more data?

This is mostly useful with tailable cursors since they will stop iterating even though they may return more results in the future.

With regular cursors, simply use a for loop instead of alive:

for doc in collection.find():
    print(doc)

Note

Even if alive is True, next() can raise StopIteration. alive can also be True while iterating a cursor from a failed server. In this case alive will return False after next() fails to retrieve the next batch of results from the server.

cursor_id

Returns the id of the cursor

Useful if you need to manage cursor ids and want to handle killing cursors manually using kill_cursors()

New in version 2.2.

fetch_next

DEPRECATED - A Future used with gen.coroutine to asynchronously retrieve the next document in the result set, fetching a batch of documents from the server if necessary. Resolves to False if there are no more documents, otherwise next_object() is guaranteed to return a document:

Attention

The fetch_next property is deprecated and will be removed in Motor 3.0. Use async for to iterate elegantly and efficiently over MotorCursor objects instead.:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     async for doc in collection.find():
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

While it appears that fetch_next retrieves each document from the server individually, the cursor actually fetches documents efficiently in large batches. Example usage:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     cursor = collection.find().sort([('_id', 1)])
...     while (await cursor.fetch_next):
...         doc = cursor.next_object()
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

Changed in version 2.2: Deprecated.

session

The cursor’s ClientSession, or None.

New in version 3.6.

motor.web - Integrate Motor with the Tornado web framework

Utilities for using Motor with Tornado web applications.

class motor.web.GridFSHandler(application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs)

A handler that can serve content from GridFS, very similar to tornado.web.StaticFileHandler.

db = motor.MotorClient().my_database
application = web.Application([
    (r"/static/(.*)", web.GridFSHandler, {"database": db}),
])

By default, requests’ If-Modified-Since headers are honored, but no specific cache-control timeout is sent to clients. Thus each request for a GridFS file requires a quick check of the file’s uploadDate in MongoDB. Override get_cache_time() in a subclass to customize this.

get_cache_time(path, modified, mime_type)

Override to customize cache control behavior.

Return a positive number of seconds to trigger aggressive caching or 0 to mark resource as cacheable, only. 0 is the default.

get_gridfs_file(bucket, filename, request)

Overridable method to choose a GridFS file to serve at a URL.

By default, if a URL pattern like "/static/(.*)" is mapped to this GridFSHandler, then the trailing portion of the URL is used as the filename, so a request for “/static/image.png” results in a call to MotorGridFSBucket.open_download_stream_by_name() with “image.png” as the filename argument. To customize the mapping of path to GridFS file, override get_gridfs_file and return a Future MotorGridOut from it.

For example, to retrieve the file by _id instead of filename:

class CustomGridFSHandler(motor.web.GridFSHandler):
    def get_gridfs_file(self, bucket, filename, request):
        # Path is interpreted as _id instead of name.
        # Return a Future MotorGridOut.
        return fs.open_download_stream(file_id=ObjectId(path))
Parameters:

Changed in version 1.0: BREAKING CHANGE: Now takes a MotorGridFSBucket, not a MotorGridFS. Also takes an additional request parameter.

Changed in version 0.2: get_gridfs_file no longer accepts a callback, instead returns a Future.

set_extra_headers(path, gridout)

For subclass to add extra headers to the response

This page describes using Motor with Tornado. For asyncio integration, see Motor asyncio API.

Motor asyncio API

AsyncIOMotorClient – Connection to MongoDB

class motor.motor_asyncio.AsyncIOMotorClient(*args, **kwargs)

Create a new connection to a single MongoDB instance at host:port.

Takes the same constructor arguments as MongoClient, as well as:

Parameters:
  • io_loop (optional): Special event loop instance to use instead of default
client[db_name] || client.db_name

Get the db_name AsyncIOMotorDatabase on AsyncIOMotorClient client.

Raises InvalidName if an invalid database name is used.

coroutine drop_database(name_or_database, session=None)

Drop a database.

Raises TypeError if name_or_database is not an instance of basestring (str in python 3) or Database.

Parameters:
  • name_or_database: the name of a database to drop, or a Database instance representing the database to drop
  • session (optional): a ClientSession.

Note

The write_concern of this client is automatically applied to this operation when using MongoDB >= 3.4.

coroutine fsync(**kwargs)

DEPRECATED: Flush all pending writes to datafiles.

Optional parameters can be passed as keyword arguments:
  • lock: If True lock the server to disallow writes.
  • async: If True don’t block while synchronizing.
  • session (optional): a ClientSession, created with start_session().

Note

Starting with Python 3.7 async is a reserved keyword. The async option to the fsync command can be passed using a dictionary instead:

options = {'async': True}
await client.fsync(**options)

Deprecated. Run the fsync command directly with command() instead. For example:

await client.admin.command('fsync', lock=True)

Changed in version 2.2: Deprecated.

Changed in version 1.2: Added session parameter.

Warning

async and lock can not be used together.

Warning

MongoDB does not support the async option on Windows and will raise an exception on that platform.

get_database(name=None, codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a MotorDatabase with the given name and options.

Useful for creating a MotorDatabase with different codec options, read preference, and/or write concern from this MotorClient.

>>> from pymongo import ReadPreference
>>> client.read_preference == ReadPreference.PRIMARY
True
>>> db1 = client.test
>>> db1.read_preference == ReadPreference.PRIMARY
True
>>> db2 = client.get_database(
...     'test', read_preference=ReadPreference.SECONDARY)
>>> db2.read_preference == ReadPreference.SECONDARY
True
Parameters:
  • name: The name of the database - a string.
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this MotorClient is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this MotorClient is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this MotorClient is used.
get_default_database(default=None, codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get the database named in the MongoDB connection URI.

>>> uri = 'mongodb://host/my_database'
>>> client = MotorClient(uri)
>>> db = client.get_default_database()
>>> assert db.name == 'my_database'
>>> db = client.get_default_database('fallback_db_name')
>>> assert db.name == 'my_database'
>>> uri_without_database = 'mongodb://host/'
>>> client = MotorClient(uri_without_database)
>>> db = client.get_default_database('fallback_db_name')
>>> assert db.name == 'fallback_db_name'

Useful in scripts where you want to choose which database to use based only on the URI in a configuration file.

Parameters:
  • default (optional): the database name to use if no database name was provided in the URI.
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this MotorClient is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this MotorClient is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this MotorClient is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this MotorClient is used.

New in version 2.1: Revived this method. Added the default, codec_options, read_preference, write_concern and read_concern parameters.

Changed in version 2.0: Removed this method.

coroutine list_database_names(session=None)

Get a list of the names of all databases on the connected server.

Parameters:
coroutine list_databases(session=None, **kwargs)

Get a cursor over the databases of the connected server.

Parameters:
  • session (optional): a ClientSession.
  • **kwargs (optional): Optional parameters of the listDatabases command can be passed as keyword arguments to this method. The supported options differ by server version.
Returns:

An instance of CommandCursor.

coroutine server_info(session=None)

Get information about the MongoDB server we’re connected to.

Parameters:
coroutine start_session(causal_consistency=True, default_transaction_options=None)

Start a logical session.

This method takes the same parameters as PyMongo’s SessionOptions. See the client_session module for details.

This session is created uninitialized, use it in an await expression to initialize it, or an async with statement.

async def coro():
    collection = client.db.collection

    # End the session after using it.
    s = await client.start_session()
    await s.end_session()

    # Or, use an "async with" statement to end the session
    # automatically.
    async with await client.start_session() as s:
        doc = {'_id': ObjectId(), 'x': 1}
        await collection.insert_one(doc, session=s)

        secondary = collection.with_options(
            read_preference=ReadPreference.SECONDARY)

        # Sessions are causally consistent by default, so we can read
        # the doc we just inserted, even reading from a secondary.
        async for doc in secondary.find(session=s):
            print(doc)
            
    # Run a multi-document transaction:
    async with await client.start_session() as s:
        # Note, start_transaction doesn't require "await".
        async with s.start_transaction():
            await collection.delete_one({'x': 1}, session=s)
            await collection.insert_one({'x': 2}, session=s)
        
        # Exiting the "with s.start_transaction()" block while throwing an
        # exception automatically aborts the transaction, exiting the block
        # normally automatically commits it.

        # You can run additional transactions in the same session, so long as 
        # you run them one at a time.
        async with s.start_transaction():
            await collection.insert_one({'x': 3}, session=s)
            await collection.insert_many({'x': {'$gte': 2}},
                                         {'$inc': {'x': 1}}, 
                                         session=s)

Requires MongoDB 3.6. Do not use the same session for multiple operations concurrently. A MotorClientSession may only be used with the MotorClient that started it.

Returns:An instance of MotorClientSession.

Changed in version 2.0: Returns a MotorClientSession. Before, this method returned a PyMongo ClientSession.

New in version 1.2.

coroutine unlock(session=None)

DEPRECATED: Unlock a previously locked server.

Parameters:

Deprecated. Users of MongoDB version 3.2 or newer can run the fsyncUnlock command directly with command():

await motor_client.admin.command('fsyncUnlock')

Users of MongoDB version 3.0 can query the “unlock” virtual collection:

await motor_client.admin["$cmd.sys.unlock"].find_one()

Changed in version 2.2: Deprecated.

watch(pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None)

Watch changes on this cluster.

Returns a MotorChangeStream cursor which iterates over changes on all databases in this cluster. Introduced in MongoDB 4.0.

See the documentation for MotorCollection.watch() for more details and examples.

Parameters:
  • pipeline (optional): A list of aggregation pipeline stages to append to an initial $changeStream stage. Not all pipeline stages are valid after a $changeStream stage, see the MongoDB documentation on change streams for the supported stages.
  • full_document (optional): The fullDocument option to pass to the $changeStream stage. Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  • resume_after (optional): A resume token. If provided, the change stream will start returning changes that occur directly after the operation specified in the resume token. A resume token is the _id value of a change document.
  • max_await_time_ms (optional): The maximum time in milliseconds for the server to wait for changes before responding to a getMore operation.
  • batch_size (optional): The maximum number of documents to return per batch.
  • collation (optional): The Collation to use for the aggregation.
  • start_at_operation_time (optional): If provided, the resulting change stream will only return changes that occurred at or after the specified Timestamp. Requires MongoDB >= 4.0.
  • session (optional): a ClientSession.
  • start_after (optional): The same as resume_after except that start_after can resume notifications after an invalidate event. This option and resume_after are mutually exclusive.
Returns:

A MotorChangeStream.

Changed in version 2.1: Added the start_after parameter.

New in version 2.0.

See also

The MongoDB documentation on

changeStreams

HOST

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

PORT

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-‘ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4

address

(host, port) of the current standalone, primary, or mongos, or None.

Accessing address raises InvalidOperation if the client is load-balancing among mongoses, since there is no single address. Use nodes instead.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

New in version 3.0.

arbiters

Arbiters in the replica set.

A sequence of (host, port) pairs. Empty if this client is not connected to a replica set, there are no arbiters, or this client was created without the replicaSet option.

close

Cleanup client resources and disconnect from MongoDB.

On MongoDB >= 3.6, end all server sessions created by this client by sending one or more endSessions commands.

Close all sockets in the connection pools and stop the monitor threads. If this instance is used again it will be automatically re-opened and the threads restarted unless auto encryption is enabled. A client enabled with auto encryption cannot be used again after being closed; any attempt will raise InvalidOperation.

Changed in version 3.6: End all server sessions created by this client.

codec_options

Read only access to the CodecOptions of this instance.

event_listeners

The event listeners registered for this client.

See monitoring for details.

is_mongos

If this client is connected to mongos. If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available..

is_primary

If this client is connected to a server that can accept writes.

True if the current server is a standalone, mongos, or the primary of a replica set. If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

local_threshold_ms

The local threshold for this instance.

max_bson_size

The largest BSON object the connected server accepts in bytes.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

max_idle_time_ms

The maximum number of milliseconds that a connection can remain idle in the pool before being removed and replaced. Defaults to None (no limit).

max_message_size

The largest message the connected server accepts in bytes.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

max_pool_size

The maximum allowable number of concurrent connections to each connected server. Requests to a server will block if there are maxPoolSize outstanding connections to the requested server. Defaults to 100. Cannot be 0.

When a server’s pool has reached max_pool_size, operations for that server block waiting for a socket to be returned to the pool. If waitQueueTimeoutMS is set, a blocked operation will raise ConnectionFailure after a timeout. By default waitQueueTimeoutMS is not set.

max_write_batch_size

The maxWriteBatchSize reported by the server.

If the client is not connected, this will block until a connection is established or raise ServerSelectionTimeoutError if no server is available.

Returns a default value when connected to server versions prior to MongoDB 2.6.

min_pool_size

The minimum required number of concurrent connections that the pool will maintain to each connected server. Default is 0.

nodes

Set of all currently connected servers.

Warning

When connected to a replica set the value of nodes can change over time as MongoClient’s view of the replica set changes. nodes can also be an empty set when MongoClient is first instantiated and hasn’t yet connected to any servers, or a network partition causes it to lose connection to all servers.

primary

The (host, port) of the current primary of the replica set.

Returns None if this client is not connected to a replica set, there is no primary, or this client was created without the replicaSet option.

New in version 3.0: MongoClient gained this property in version 3.0 when MongoReplicaSetClient’s functionality was merged in.

read_concern

Read only access to the ReadConcern of this instance.

New in version 3.2.

read_preference

Read only access to the read preference of this instance.

Changed in version 3.0: The read_preference attribute is now read only.

retry_reads

If this instance should retry supported write operations.

retry_writes

If this instance should retry supported write operations.

secondaries

The secondary members known to this client.

A sequence of (host, port) pairs. Empty if this client is not connected to a replica set, there are no visible secondaries, or this client was created without the replicaSet option.

New in version 3.0: MongoClient gained this property in version 3.0 when MongoReplicaSetClient’s functionality was merged in.

server_selection_timeout

The server selection timeout for this instance in seconds.

write_concern

Read only access to the WriteConcern of this instance.

Changed in version 3.0: The write_concern attribute is now read only.

AsyncIOMotorClientSession – Sequence of operations

class motor.motor_asyncio.AsyncIOMotorClientSession(delegate, motor_client)
coroutine abort_transaction()

Abort a multi-statement transaction.

coroutine commit_transaction()

Commit a multi-statement transaction.

coroutine end_session()

Finish this session. If a transaction has started, abort it.

It is an error to use the session after the session has ended.

start_transaction(read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None)

Start a multi-statement transaction.

Takes the same arguments as TransactionOptions.

Best used in a context manager block:

# Use "await" for start_session, but not for start_transaction.
async with await client.start_session() as s:
    async with s.start_transaction():
        await collection.delete_one({'x': 1}, session=s)
        await collection.insert_one({'x': 2}, session=s)
with_transaction(coro, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None)

Executes an awaitable in a transaction.

This method starts a transaction on this session, awaits coro once, and then commits the transaction. For example:

async def coro(session):
    orders = session.client.db.orders
    inventory = session.client.db.inventory
    inserted_id = await orders.insert_one(
        {"sku": "abc123", "qty": 100}, session=session)
    await inventory.update_one(
        {"sku": "abc123", "qty": {"$gte": 100}},
        {"$inc": {"qty": -100}}, session=session)
    return inserted_id

async with await client.start_session() as session:
    inserted_id = await session.with_transaction(coro)

To pass arbitrary arguments to the coro, wrap it with a lambda like this:

async def coro(session, custom_arg, custom_kwarg=None):
    # Transaction operations...

async with await client.start_session() as session:
    await session.with_transaction(
        lambda s: coro(s, "custom_arg", custom_kwarg=1))

In the event of an exception, with_transaction may retry the commit or the entire transaction, therefore coro may be awaited multiple times by a single call to with_transaction. Developers should be mindful of this possiblity when writing a coro that modifies application state or has any other side-effects. Note that even when the coro is invoked multiple times, with_transaction ensures that the transaction will be committed at-most-once on the server.

The coro should not attempt to start new transactions, but should simply run operations meant to be contained within a transaction. The coro should also not commit the transaction; this is handled automatically by with_transaction. If the coro does commit or abort the transaction without error, however, with_transaction will return without taking further action.

When coro raises an exception, with_transaction automatically aborts the current transaction. When coro or commit_transaction() raises an exception that includes the "TransientTransactionError" error label, with_transaction starts a new transaction and re-executes the coro.

When commit_transaction() raises an exception with the "UnknownTransactionCommitResult" error label, with_transaction retries the commit until the result of the transaction is known.

This method will cease retrying after 120 seconds has elapsed. This timeout is not configurable and any exception raised by the coro or by ClientSession.commit_transaction() after the timeout is reached will be re-raised. Applications that desire a different timeout duration should not use this method.

Parameters:
  • coro: The coroutine to run inside a transaction. The coroutine must accept a single argument, this session. Note, under certain error conditions the coroutine may be run multiple times.
  • read_concern (optional): The ReadConcern to use for this transaction.
  • write_concern (optional): The WriteConcern to use for this transaction.
  • read_preference (optional): The read preference to use for this transaction. If None (the default) the read_preference of this Database is used. See read_preferences for options.
Returns:

The return value of the coro.

New in version 2.1.

advance_cluster_time

Update the cluster time for this session.

Parameters:
  • cluster_time: The cluster_time from another ClientSession instance.
advance_operation_time

Update the operation time for this session.

Parameters:
  • operation_time: The operation_time from another ClientSession instance.
client

The MotorClient this session was created from.

cluster_time

The cluster time returned by the last operation executed in this session.

has_ended

True if this session is finished.

in_transaction

True if this session has an active multi-statement transaction.

New in version 3.10.

operation_time

The operation time returned by the last operation executed in this session.

options

The SessionOptions this session was created with.

session_id

A BSON document, the opaque server session identifier.

AsyncIOMotorDatabase

class motor.motor_asyncio.AsyncIOMotorDatabase(client, name, **kwargs)
db[collection_name] || db.collection_name

Get the collection_name AsyncIOMotorCollection of AsyncIOMotorDatabase db.

Raises InvalidName if an invalid collection name is used.

aggregate(pipeline, **kwargs)

Execute an aggregation pipeline on this database.

Introduced in MongoDB 3.6.

The aggregation can be run on a secondary if the client is connected to a replica set and its read_preference is not PRIMARY. The aggregate() method obeys the read_preference of this MotorDatabase, except when $out or $merge are used, in which case PRIMARY is used.

All optional aggregate command parameters should be passed as keyword arguments to this method. Valid options include, but are not limited to:

  • allowDiskUse (bool): Enables writing to temporary files. When set to True, aggregation stages can write data to the _tmp subdirectory of the –dbpath directory. The default is False.
  • maxTimeMS (int): The maximum amount of time to allow the operation to run in milliseconds.
  • batchSize (int): The maximum number of documents to return per batch. Ignored if the connected mongod or mongos does not support returning aggregate results using a cursor.
  • collation (optional): An instance of Collation.

Returns a MotorCommandCursor that can be iterated like a cursor from find():

async def f():
    # Lists all operations currently running on the server.
    pipeline = [{"$currentOp": {}}]
    async for operation in client.admin.aggregate(pipeline):
        print(operation)

Note

This method does not support the ‘explain’ option. Please use MotorDatabase.command() instead.

Note

The MotorDatabase.write_concern of this database is automatically applied to this operation.

New in version 2.1.

coroutine command(command, value=1, check=True, allowable_errors=None, read_preference=None, codec_options=CodecOptions(document_class=dict, tz_aware=False, uuid_representation=UuidRepresentation.PYTHON_LEGACY, unicode_decode_error_handler='strict', tzinfo=None, type_registry=TypeRegistry(type_codecs=[], fallback_encoder=None)), session=None, **kwargs)

Issue a MongoDB command.

Send command command to the database and return the response. If command is a string then the command {command: value} will be sent. Otherwise, command must be a dict and will be sent as-is.

Additional keyword arguments are added to the final command document before it is sent.

For example, a command like {buildinfo: 1} can be sent using:

result = await db.command("buildinfo")

For a command where the value matters, like {collstats: collection_name} we can do:

result = await db.command("collstats", collection_name)

For commands that take additional arguments we can use kwargs. So {filemd5: object_id, root: file_root} becomes:

result = await db.command("filemd5", object_id, root=file_root)
Parameters:
  • command: document representing the command to be issued, or the name of the command (for simple commands only).

    Note

    the order of keys in the command document is significant (the “verb” must come first), so commands which require multiple keys (e.g. findandmodify) should use an instance of SON or a string and kwargs instead of a Python dict.

  • value (optional): value to use for the command verb when command is passed as a string

  • check (optional): check the response for errors, raising OperationFailure if there are any

  • allowable_errors: if check is True, error messages in this list will be ignored by error-checking

  • read_preference: The read preference for this operation. See read_preferences for options.

  • session (optional): a ClientSession, created with start_session().

  • **kwargs (optional): additional keyword arguments will be added to the command document before it is sent

Changed in version 1.2: Added session parameter.

See also

The MongoDB documentation on

commands

coroutine create_collection(name, codec_options=None, read_preference=None, write_concern=None, read_concern=None, session=None, **kwargs)

Create a new Collection in this database.

Normally collection creation is automatic. This method should only be used to specify options on creation. CollectionInvalid will be raised if the collection already exists.

Options should be passed as keyword arguments to this method. Supported options vary with MongoDB release. Some examples include:

  • “size”: desired initial size for the collection (in bytes). For capped collections this size is the max size of the collection.
  • “capped”: if True, this is a capped collection
  • “max”: maximum number of objects if capped (optional)

See the MongoDB documentation for a full list of supported options by server version.

Parameters:
  • name: the name of the collection to create
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Database is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Database is used.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Database is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Database is used.
  • collation (optional): An instance of Collation.
  • session (optional): a ClientSession.
  • **kwargs (optional): additional keyword arguments will be passed as options for the create collection command
coroutine current_op(include_all=False, session=None)

DEPRECATED: Get information on operations currently running.

Starting with MongoDB 3.6 this helper is obsolete. The functionality provided by this helper is available in MongoDB 3.6+ using the $currentOp aggregation pipeline stage, which can be used with aggregate(). Note that, while this helper can only return a single document limited to a 16MB result, aggregate() returns a cursor avoiding that limitation.

Users of MongoDB versions older than 3.6 can use the currentOp command directly:

# MongoDB 3.2 and 3.4
await client.admin.command("currentOp")

Or query the “inprog” virtual collection:

# MongoDB 2.6 and 3.0
await client.admin["$cmd.sys.inprog"].find_one()
Parameters:
  • include_all (optional): if True also list currently idle operations in the result
  • session (optional): a ClientSession, created with start_session().

Changed in version 2.1: Deprecated, use aggregate() instead.

Changed in version 1.2: Added session parameter.

coroutine dereference(dbref, session=None, **kwargs)

Dereference a DBRef, getting the document it points to.

Raises TypeError if dbref is not an instance of DBRef. Returns a document, or None if the reference does not point to a valid document. Raises ValueError if dbref has a database specified that is different from the current database.

Parameters:
  • dbref: the reference
  • session (optional): a ClientSession.
  • **kwargs (optional): any additional keyword arguments are the same as the arguments to find().
coroutine drop_collection(name_or_collection, session=None)

Drop a collection.

Parameters:
  • name_or_collection: the name of a collection to drop or the collection object itself
  • session (optional): a ClientSession.

Note

The write_concern of this database is automatically applied to this operation when using MongoDB >= 3.4.

get_collection(name, codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a Collection with the given name and options.

Useful for creating a Collection with different codec options, read preference, and/or write concern from this Database.

>>> db.read_preference
Primary()
>>> coll1 = db.test
>>> coll1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> coll2 = db.get_collection(
...     'test', read_preference=ReadPreference.SECONDARY)
>>> coll2.read_preference
Secondary(tag_sets=None)
Parameters:
  • name: The name of the collection - a string.
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Database is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Database is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Database is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Database is used.
coroutine list_collection_names(session=None, filter=None, **kwargs)

Get a list of all the collection names in this database.

For example, to list all non-system collections:

filter = {"name": {"$regex": r"^(?!system\.)"}}
names = await db.list_collection_names(filter=filter)
Parameters:
  • session (optional): a ClientSession, created with start_session().
  • filter (optional): A query document to filter the list of collections returned from the listCollections command.
  • **kwargs (optional): Optional parameters of the listCollections command can be passed as keyword arguments to this method. The supported options differ by server version.

Changed in version 2.1: Added the filter and **kwargs parameters.

New in version 1.2.

coroutine list_collections(session=None, filter=None, **kwargs)

Get a cursor over the collectons of this database.

Parameters:
  • session (optional): a ClientSession.
  • filter (optional): A query document to filter the list of collections returned from the listCollections command.
  • **kwargs (optional): Optional parameters of the listCollections command can be passed as keyword arguments to this method. The supported options differ by server version.
Returns:

An instance of CommandCursor.

coroutine profiling_info(session=None)

Returns a list containing current profiling information.

Parameters:

See also

The MongoDB documentation on

profiling

coroutine profiling_level(session=None)

Get the database’s current profiling level.

Returns one of (OFF, SLOW_ONLY, ALL).

Parameters:

See also

The MongoDB documentation on

profiling

coroutine set_profiling_level(level, slow_ms=None, session=None)

Set the database’s profiling level.

Parameters:
  • level: Specifies a profiling level, see list of possible values below.
  • slow_ms: Optionally modify the threshold for the profile to consider a query or operation. Even if the profiler is off queries slower than the slow_ms level will get written to the logs.
  • session (optional): a ClientSession.

Possible level values:

Level Setting
OFF Off. No profiling.
SLOW_ONLY On. Only includes slow operations.
ALL On. Includes all operations.

Raises ValueError if level is not one of (OFF, SLOW_ONLY, ALL).

See also

The MongoDB documentation on

profiling

coroutine validate_collection(name_or_collection, scandata=False, full=False, session=None, background=None)

Validate a collection.

Returns a dict of validation info. Raises CollectionInvalid if validation fails.

See also the MongoDB documentation on the validate command.

Parameters:
  • name_or_collection: A Collection object or the name of a collection to validate.
  • scandata: Do extra checks beyond checking the overall structure of the collection.
  • full: Have the server do a more thorough scan of the collection. Use with scandata for a thorough scan of the structure of the collection and the individual documents.
  • session (optional): a ClientSession.
  • background (optional): A boolean flag that determines whether the command runs in the background. Requires MongoDB 4.4+.
watch(pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None)

Watch changes on this database.

Returns a MotorChangeStream cursor which iterates over changes on this database. Introduced in MongoDB 4.0.

See the documentation for MotorCollection.watch() for more details and examples.

Parameters:
  • pipeline (optional): A list of aggregation pipeline stages to append to an initial $changeStream stage. Not all pipeline stages are valid after a $changeStream stage, see the MongoDB documentation on change streams for the supported stages.
  • full_document (optional): The fullDocument option to pass to the $changeStream stage. Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  • resume_after (optional): A resume token. If provided, the change stream will start returning changes that occur directly after the operation specified in the resume token. A resume token is the _id value of a change document.
  • max_await_time_ms (optional): The maximum time in milliseconds for the server to wait for changes before responding to a getMore operation.
  • batch_size (optional): The maximum number of documents to return per batch.
  • collation (optional): The Collation to use for the aggregation.
  • start_at_operation_time (optional): If provided, the resulting change stream will only return changes that occurred at or after the specified Timestamp. Requires MongoDB >= 4.0.
  • session (optional): a ClientSession.
  • start_after (optional): The same as resume_after except that start_after can resume notifications after an invalidate event. This option and resume_after are mutually exclusive.
Returns:

A MotorChangeStream.

Changed in version 2.1: Added the start_after parameter.

New in version 2.0.

See also

The MongoDB documentation on

changeStreams

with_options(codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a clone of this database changing the specified settings.

>>> db1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> db2 = db1.with_options(read_preference=ReadPreference.SECONDARY)
>>> db1.read_preference
Primary()
>>> db2.read_preference
Secondary(tag_sets=None)
Parameters:
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Collection is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Collection is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Collection is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Collection is used.

New in version 3.8.

client

This MotorDatabase’s MotorClient.

codec_options

Read only access to the CodecOptions of this instance.

incoming_copying_manipulators

DEPRECATED: All incoming SON copying manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

incoming_manipulators

DEPRECATED: All incoming SON manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

name

The name of this Database.

outgoing_copying_manipulators

DEPRECATED: All outgoing SON copying manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

outgoing_manipulators

DEPRECATED: All outgoing SON manipulators.

Changed in version 3.5: Deprecated.

New in version 2.0.

read_concern

Read only access to the ReadConcern of this instance.

New in version 3.2.

read_preference

Read only access to the read preference of this instance.

Changed in version 3.0: The read_preference attribute is now read only.

write_concern

Read only access to the WriteConcern of this instance.

Changed in version 3.0: The write_concern attribute is now read only.

AsyncIOMotorCollection

class motor.motor_asyncio.AsyncIOMotorCollection(database, name, codec_options=None, read_preference=None, write_concern=None, read_concern=None, _delegate=None)
c[name] || c.name

Get the name sub-collection of AsyncIOMotorCollection c.

Raises InvalidName if an invalid collection name is used.

database

The AsyncIOMotorDatabase that this AsyncIOMotorCollection is a part of.

coroutine create_index(self, keys, **kwargs)

Creates an index on this collection.

Takes either a single key or a list of (key, direction) pairs. The key(s) must be an instance of basestring (str in python 3), and the direction(s) must be one of (ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, HASHED, TEXT).

To create a single key ascending index on the key 'mike' we just use a string argument:

await my_collection.create_index("mike")

For a compound index on 'mike' descending and 'eliot' ascending we need to use a list of tuples:

await my_collection.create_index([("mike", pymongo.DESCENDING),
                                  ("eliot", pymongo.ASCENDING)])

All optional index creation parameters should be passed as keyword arguments to this method. For example:

await my_collection.create_index([("mike", pymongo.DESCENDING)],
                                 background=True)

Valid options include, but are not limited to:

  • name: custom name to use for this index - if none is given, a name will be generated.
  • unique: if True creates a uniqueness constraint on the index.
  • background: if True this index should be created in the background.
  • sparse: if True, omit from the index any documents that lack the indexed field.
  • bucketSize: for use with geoHaystack indexes. Number of documents to group together within a certain proximity to a given longitude and latitude.
  • min: minimum value for keys in a GEO2D index.
  • max: maximum value for keys in a GEO2D index.
  • expireAfterSeconds: <int> Used to create an expiring (TTL) collection. MongoDB will automatically delete documents from this collection after <int> seconds. The indexed field must be a UTC datetime or the data will not expire.
  • partialFilterExpression: A document that specifies a filter for a partial index.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.

See the MongoDB documentation for a full list of supported options by server version.

Warning

dropDups is not supported by MongoDB 3.0 or newer. The option is silently ignored by the server and unique index builds using the option will fail if a duplicate value is detected.

Note

partialFilterExpression requires server version >= 3.2

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

Parameters:
  • keys: a single key or a list of (key, direction) pairs specifying the index to create
  • **kwargs (optional): any additional index creation options (see the above list) should be passed as keyword arguments

See also

The MongoDB documentation on

indexes

coroutine inline_map_reduce(self, map, reduce, full_response=False, **kwargs)

Perform an inline map/reduce operation on this collection.

Perform the map/reduce operation on the server in RAM. A result collection is not created. The result set is returned as a list of documents.

If full_response is False (default) returns the result documents in a list. Otherwise, returns the full response from the server to the map reduce command.

The inline_map_reduce() method obeys the read_preference of this Collection.

Parameters:
  • map: map function (as a JavaScript string)

  • reduce: reduce function (as a JavaScript string)

  • full_response (optional): if True, return full response to this command - otherwise just return the result collection

  • **kwargs (optional): additional arguments to the map reduce command may be passed as keyword arguments to this helper method, e.g.:

    await db.test.inline_map_reduce(map, reduce, limit=2)
    

See also

The MongoDB documentation on

mapreduce

aggregate(pipeline, **kwargs)

Execute an aggregation pipeline on this collection.

The aggregation can be run on a secondary if the client is connected to a replica set and its read_preference is not PRIMARY.

Parameters:
  • pipeline: a single command or list of aggregation commands
  • session (optional): a ClientSession, created with start_session().
  • **kwargs: send arbitrary parameters to the aggregate command

Returns a MotorCommandCursor that can be iterated like a cursor from find():

async def f():
    pipeline = [{'$project': {'name': {'$toUpper': '$name'}}}]
    async for doc in collection.aggregate(pipeline):
        print(doc)

MotorCommandCursor does not allow the explain option. To explain MongoDB’s query plan for the aggregation, use MotorDatabase.command():

async def f():
    plan = await db.command(
        'aggregate', 'COLLECTION-NAME',
        pipeline=[{'$project': {'x': 1}}],
        explain=True)

    print(plan)

Changed in version 2.1: This collection’s read concern is now applied to pipelines containing the $out stage when connected to MongoDB >= 4.2.

Changed in version 1.0: aggregate() now always returns a cursor.

Changed in version 0.5: aggregate() now returns a cursor by default, and the cursor is returned immediately without an await. See aggregation changes in Motor 0.5.

Changed in version 0.2: Added cursor support.

aggregate_raw_batches(pipeline, **kwargs)

Perform an aggregation and retrieve batches of raw BSON.

Similar to the aggregate() method but returns each batch as bytes.

This example demonstrates how to work with raw batches, but in practice raw batches should be passed to an external library that can decode BSON into another data type, rather than used with PyMongo’s bson module.

async def get_raw():
    cursor = db.test.aggregate_raw_batches()
    async for batch in cursor:
        print(bson.decode_all(batch))

Note that aggregate_raw_batches does not support sessions.

New in version 2.0.

coroutine bulk_write(requests, ordered=True, bypass_document_validation=False, session=None)

Send a batch of write operations to the server.

Requests are passed as a list of write operation instances imported from pymongo: InsertOne, UpdateOne, UpdateMany, ReplaceOne, DeleteOne, or DeleteMany).

For example, say we have these documents:

{'x': 1, '_id': ObjectId('54f62e60fba5226811f634ef')}
{'x': 1, '_id': ObjectId('54f62e60fba5226811f634f0')}

We can insert a document, delete one, and replace one like so:

# DeleteMany, UpdateOne, and UpdateMany are also available.
from pymongo import InsertOne, DeleteOne, ReplaceOne

async def modify_data():
    requests = [InsertOne({'y': 1}), DeleteOne({'x': 1}),
                ReplaceOne({'w': 1}, {'z': 1}, upsert=True)]
    result = await db.test.bulk_write(requests)

    print("inserted %d, deleted %d, modified %d" % (
        result.inserted_count, result.deleted_count, result.modified_count))

    print("upserted_ids: %s" % result.upserted_ids)

    print("collection:")
    async for doc in db.test.find():
        print(doc)

This will print something like:

inserted 1, deleted 1, modified 0
upserted_ids: {2: ObjectId('54f62ee28891e756a6e1abd5')}

collection:
{'x': 1, '_id': ObjectId('54f62e60fba5226811f634f0')}
{'y': 1, '_id': ObjectId('54f62ee2fba5226811f634f1')}
{'z': 1, '_id': ObjectId('54f62ee28891e756a6e1abd5')}
Parameters:
  • requests: A list of write operations (see examples above).
  • ordered (optional): If True (the default) requests will be performed on the server serially, in the order provided. If an error occurs all remaining operations are aborted. If False requests will be performed on the server in arbitrary order, possibly in parallel, and all operations will be attempted.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • session (optional): a ClientSession, created with start_session().
Returns:

An instance of BulkWriteResult.

Note

bypass_document_validation requires server version >= 3.2

Changed in version 1.2: Added session parameter.

coroutine count_documents(filter, session=None, **kwargs)

Count the number of documents in this collection.

Note

For a fast count of the total documents in a collection see estimated_document_count().

The count_documents() method is supported in a transaction.

All optional parameters should be passed as keyword arguments to this method. Valid options include:

  • skip (int): The number of matching documents to skip before returning results.
  • limit (int): The maximum number of documents to count. Must be a positive integer. If not provided, no limit is imposed.
  • maxTimeMS (int): The maximum amount of time to allow this operation to run, in milliseconds.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (string or list of tuples): The index to use. Specify either the index name as a string or the index specification as a list of tuples (e.g. [(‘a’, pymongo.ASCENDING), (‘b’, pymongo.ASCENDING)]). This option is only supported on MongoDB 3.6 and above.

The count_documents() method obeys the read_preference of this Collection.

Note

When migrating from count() to count_documents() the following query operators must be replaced:

Operator Replacement
$where $expr
$near $geoWithin with $center
$nearSphere $geoWithin with $centerSphere

$expr requires MongoDB 3.6+

Parameters:
  • filter (required): A query document that selects which documents to count in the collection. Can be an empty document to count all documents.
  • session (optional): a ClientSession.
  • **kwargs (optional): See list of options above.
coroutine create_indexes(indexes, session=None, **kwargs)

Create one or more indexes on this collection:

from pymongo import IndexModel, ASCENDING, DESCENDING

async def create_two_indexes():
    index1 = IndexModel([("hello", DESCENDING),
                         ("world", ASCENDING)], name="hello_world")
    index2 = IndexModel([("goodbye", DESCENDING)])
    print(await db.test.create_indexes([index1, index2]))

This prints:

['hello_world', 'goodbye_-1']
Parameters:
  • indexes: A list of IndexModel instances.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): optional arguments to the createIndexes command (like maxTimeMS) can be passed as keyword arguments.

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

Changed in version 1.2: Added session parameter.

coroutine delete_many(filter, collation=None, hint=None, session=None)

Delete one or more documents matching the filter.

If we have a collection with 3 documents like {'x': 1}, then:

async def clear_collection():
    result = await db.test.delete_many({'x': 1})
    print(result.deleted_count)

This deletes all matching documents and prints “3”.

Parameters:
  • filter: A query that matches the documents to delete.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (optional): An index used to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine delete_one(filter, collation=None, hint=None, session=None)

Delete a single document matching the filter.

If we have a collection with 3 documents like {'x': 1}, then:

async def clear_collection():
    result = await db.test.delete_one({'x': 1})
    print(result.deleted_count)

This deletes one matching document and prints “1”.

Parameters:
  • filter: A query that matches the document to delete.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (optional): An index used to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine distinct(key, filter=None, session=None, **kwargs)

Get a list of distinct values for key among all documents in this collection.

Raises TypeError if key is not an instance of basestring (str in python 3).

All optional distinct parameters should be passed as keyword arguments to this method. Valid options include:

  • maxTimeMS (int): The maximum amount of time to allow the count command to run, in milliseconds.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.

The distinct() method obeys the read_preference of this Collection.

Parameters:
  • key: name of the field for which we want to get the distinct values
  • filter (optional): A query document that specifies the documents from which to retrieve the distinct values.
  • session (optional): a ClientSession.
  • **kwargs (optional): See list of options above.
coroutine drop(session=None)

Alias for drop_collection.

The following two calls are equivalent:

await db.foo.drop()
await db.drop_collection("foo")
coroutine drop_index(index_or_name, session=None, **kwargs)

Drops the specified index on this collection.

Can be used on non-existant collections or collections with no indexes. Raises OperationFailure on an error (e.g. trying to drop an index that does not exist). index_or_name can be either an index name (as returned by create_index), or an index specifier (as passed to create_index). An index specifier should be a list of (key, direction) pairs. Raises TypeError if index is not an instance of (str, unicode, list).

Warning

if a custom name was used on index creation (by passing the name parameter to create_index() or ensure_index()) the index must be dropped by name.

Parameters:
  • index_or_name: index (or name of index) to drop
  • session (optional): a ClientSession.
  • **kwargs (optional): optional arguments to the createIndexes command (like maxTimeMS) can be passed as keyword arguments.

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

coroutine drop_indexes(session=None, **kwargs)

Drops all indexes on this collection.

Can be used on non-existant collections or collections with no indexes. Raises OperationFailure on an error.

Parameters:
  • session (optional): a ClientSession.
  • **kwargs (optional): optional arguments to the createIndexes command (like maxTimeMS) can be passed as keyword arguments.

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

coroutine estimated_document_count(**kwargs)

Get an estimate of the number of documents in this collection using collection metadata.

The estimated_document_count() method is not supported in a transaction.

All optional parameters should be passed as keyword arguments to this method. Valid options include:

  • maxTimeMS (int): The maximum amount of time to allow this operation to run, in milliseconds.
Parameters:
  • **kwargs (optional): See list of options above.
find(*args, **kwargs)

Create a MotorCursor. Same parameters as for PyMongo’s find().

Note that find does not require an await expression, because find merely creates a MotorCursor without performing any operations on the server. MotorCursor methods such as to_list() perform actual operations.

coroutine find_one(filter=None, *args, **kwargs)

Get a single document from the database.

All arguments to find() are also valid arguments for find_one(), although any limit argument will be ignored. Returns a single document, or None if no matching document is found.

The find_one() method obeys the read_preference of this Motor collection instance.

Parameters:
  • filter (optional): a dictionary specifying the query to be performed OR any other type to be used as the value for a query for "_id".
  • *args (optional): any additional positional arguments are the same as the arguments to find().
  • **kwargs (optional): any additional keyword arguments are the same as the arguments to find().
  • max_time_ms (optional): a value for max_time_ms may be specified as part of **kwargs, e.g.:
await collection.find_one(max_time_ms=100)

Changed in version 1.2: Added session parameter.

coroutine find_one_and_delete(filter, projection=None, sort=None, hint=None, session=None, **kwargs)

Finds a single document and deletes it, returning the document.

If we have a collection with 2 documents like {'x': 1}, then this code retrieves and deletes one of them:

async def delete_one_document():
    print(await db.test.count_documents({'x': 1}))
    doc = await db.test.find_one_and_delete({'x': 1})
    print(doc)
    print(await db.test.count_documents({'x': 1}))

This outputs something like:

2
{'x': 1, '_id': ObjectId('54f4e12bfba5220aa4d6dee8')}
1

If multiple documents match filter, a sort can be applied. Say we have 3 documents like:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

This code retrieves and deletes the document with the largest _id:

async def delete_with_largest_id():
    doc = await db.test.find_one_and_delete(
        {'x': 1}, sort=[('_id', pymongo.DESCENDING)])

This deletes one document and prints it:

{'x': 1, '_id': 2}

The projection option can be used to limit the fields returned:

async def delete_and_return_x():
    db.test.find_one_and_delete({'x': 1}, projection={'_id': False})

This prints:

{'x': 1}
Parameters:
  • filter: A query that matches the document to delete.
  • projection (optional): a list of field names that should be returned in the result document or a mapping specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a mapping to exclude fields from the result (e.g. projection={‘_id’: False}).
  • sort (optional): a list of (key, direction) pairs specifying the sort order for the query. If multiple documents match the query, they are sorted and the first is deleted.
  • hint (optional): An index used to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): additional command arguments can be passed as keyword arguments (for example maxTimeMS can be used with recent server versions).

This command uses the WriteConcern of this Collection when connected to MongoDB >= 3.2. Note that using an elevated write concern with this command may be slower compared to using the default write concern.

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine find_one_and_replace(filter, replacement, projection=None, sort=None, upsert=False, return_document=False, hint=None, session=None, **kwargs)

Finds a single document and replaces it, returning either the original or the replaced document.

The find_one_and_replace() method differs from find_one_and_update() by replacing the document matched by filter, rather than modifying the existing document.

Say we have 3 documents like:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

Replace one of them like so:

async def replace_one_doc():
    original_doc = await db.test.find_one_and_replace({'x': 1}, {'y': 1})
    print("original: %s" % original_doc)
    print("collection:")
    async for doc in db.test.find():
        print(doc)

This will print:

original: {'x': 1, '_id': 0}
collection:
{'y': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}
Parameters:
  • filter: A query that matches the document to replace.
  • replacement: The replacement document.
  • projection (optional): A list of field names that should be returned in the result document or a mapping specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a mapping to exclude fields from the result (e.g. projection={‘_id’: False}).
  • sort (optional): a list of (key, direction) pairs specifying the sort order for the query. If multiple documents match the query, they are sorted and the first is replaced.
  • upsert (optional): When True, inserts a new document if no document matches the query. Defaults to False.
  • return_document: If ReturnDocument.BEFORE (the default), returns the original document before it was replaced, or None if no document matches. If ReturnDocument.AFTER, returns the replaced or inserted document.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): additional command arguments can be passed as keyword arguments (for example maxTimeMS can be used with recent server versions).

This command uses the WriteConcern of this Collection when connected to MongoDB >= 3.2. Note that using an elevated write concern with this command may be slower compared to using the default write concern.

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine find_one_and_update(filter, update, projection=None, sort=None, upsert=False, return_document=False, array_filters=None, hint=None, session=None, **kwargs)

Finds a single document and updates it, returning either the original or the updated document. By default find_one_and_update() returns the original version of the document before the update was applied:

async def set_done():
    print(await db.test.find_one_and_update(
        {'_id': 665}, {'$inc': {'count': 1}, '$set': {'done': True}}))

This outputs:

{'_id': 665, 'done': False, 'count': 25}}

To return the updated version of the document instead, use the return_document option.

from pymongo import ReturnDocument

async def increment_by_userid():
    print(await db.example.find_one_and_update(
        {'_id': 'userid'},
        {'$inc': {'seq': 1}},
        return_document=ReturnDocument.AFTER))

This prints:

{'_id': 'userid', 'seq': 1}

You can limit the fields returned with the projection option.

async def increment_by_userid():
    print(await db.example.find_one_and_update(
        {'_id': 'userid'},
        {'$inc': {'seq': 1}},
        projection={'seq': True, '_id': False},
        return_document=ReturnDocument.AFTER))

This results in:

{'seq': 2}

The upsert option can be used to create the document if it doesn’t already exist.

async def increment_by_userid():
    print(await db.example.find_one_and_update(
        {'_id': 'userid'},
        {'$inc': {'seq': 1}},
        projection={'seq': True, '_id': False},
        upsert=True,
        return_document=ReturnDocument.AFTER))

The result:

{'seq': 1}

If multiple documents match filter, a sort can be applied. Say we have these two documents:

{'_id': 665, 'done': True, 'result': {'count': 26}}
{'_id': 701, 'done': True, 'result': {'count': 17}}

Then to update the one with the great _id:

async def set_done():
    print(await db.test.find_one_and_update(
        {'done': True},
        {'$set': {'final': True}},
        sort=[('_id', pymongo.DESCENDING)]))

This would print:

{'_id': 701, 'done': True, 'result': {'count': 17}}
Parameters:
  • filter: A query that matches the document to update.
  • update: The update operations to apply.
  • projection (optional): A list of field names that should be returned in the result document or a mapping specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a dict to exclude fields from the result (e.g. projection={‘_id’: False}).
  • sort (optional): a list of (key, direction) pairs specifying the sort order for the query. If multiple documents match the query, they are sorted and the first is updated.
  • upsert (optional): When True, inserts a new document if no document matches the query. Defaults to False.
  • return_document: If ReturnDocument.BEFORE (the default), returns the original document before it was updated, or None if no document matches. If ReturnDocument.AFTER, returns the updated or inserted document.
  • array_filters (optional): A list of filters specifying which array elements an update should apply. Requires MongoDB 3.6+.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.4 and above.
  • session (optional): a ClientSession, created with start_session().
  • **kwargs (optional): additional command arguments can be passed as keyword arguments (for example maxTimeMS can be used with recent server versions).

This command uses the WriteConcern of this Collection when connected to MongoDB >= 3.2. Note that using an elevated write concern with this command may be slower compared to using the default write concern.

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added array_filters and session parameters.

find_raw_batches(*args, **kwargs)

Query the database and retrieve batches of raw BSON.

Similar to the find() method but returns each batch as bytes.

This example demonstrates how to work with raw batches, but in practice raw batches should be passed to an external library that can decode BSON into another data type, rather than used with PyMongo’s bson module.

async def get_raw():
    cursor = db.test.find_raw_batches()
    async for batch in cursor:
        print(bson.decode_all(batch))

Note that find_raw_batches does not support sessions.

New in version 2.0.

coroutine index_information(session=None)

Get information on this collection’s indexes.

Returns a dictionary where the keys are index names (as returned by create_index()) and the values are dictionaries containing information about each index. The dictionary is guaranteed to contain at least a single key, "key" which is a list of (key, direction) pairs specifying the index (as passed to create_index()). It will also contain any other metadata about the indexes, except for the "ns" and "name" keys, which are cleaned. For example:

async def create_x_index():
    print(await db.test.create_index("x", unique=True))
    print(await db.test.index_information())

This prints:

'x_1'
{'_id_': {'key': [('_id', 1)]},
 'x_1': {'unique': True, 'key': [('x', 1)]}}

Changed in version 1.2: Added session parameter.

coroutine insert_many(documents, ordered=True, bypass_document_validation=False, session=None)

Insert an iterable of documents.

async def insert_2_docs():
    result = db.test.insert_many([{'x': i} for i in range(2)])
    result.inserted_ids

This prints something like:

[ObjectId('54f113fffba522406c9cc20e'), ObjectId('54f113fffba522406c9cc20f')]
Parameters:
  • documents: A iterable of documents to insert.
  • ordered (optional): If True (the default) documents will be inserted on the server serially, in the order provided. If an error occurs all remaining inserts are aborted. If False, documents will be inserted on the server in arbitrary order, possibly in parallel, and all document inserts will be attempted.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • session (optional): a ClientSession, created with start_session().
Returns:

An instance of InsertManyResult.

Note

bypass_document_validation requires server version >= 3.2

Changed in version 1.2: Added session parameter.

coroutine insert_one(document, bypass_document_validation=False, session=None)

Insert a single document.

async def insert_x():
    result = await db.test.insert_one({'x': 1})
    print(result.inserted_id)

This code outputs the new document’s _id:

ObjectId('54f112defba522406c9cc208')
Parameters:
  • document: The document to insert. Must be a mutable mapping type. If the document does not have an _id field one will be added automatically.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 1.2: Added session parameter.

list_indexes(session=None)

Get a cursor over the index documents for this collection.

async def print_indexes():
    async for index in db.test.list_indexes():
        print(index)

If the only index is the default index on _id, this might print:

SON([('v', 1), ('key', SON([('_id', 1)])), ('name', '_id_')])
coroutine map_reduce(map, reduce, out, full_response=False, session=None, **kwargs)

Perform a map/reduce operation on this collection.

If full_response is False (default) returns a MotorCollection instance containing the results of the operation. Otherwise, returns the full response from the server to the map reduce command.

Parameters:
  • map: map function (as a JavaScript string)

  • reduce: reduce function (as a JavaScript string)

  • out: output collection name or out object (dict). See the map reduce command documentation for available options. Note: out options are order sensitive. SON can be used to specify multiple options. e.g. SON([(‘replace’, <collection name>), (‘db’, <database name>)])

  • full_response (optional): if True, return full response to this command - otherwise just return the result collection

  • session (optional): a ClientSession, created with start_session().

  • **kwargs (optional): additional arguments to the map reduce command may be passed as keyword arguments to this helper method, e.g.:

    result = await db.test.map_reduce(map, reduce, "myresults", limit=2)
    

Returns a Future.

Note

The map_reduce() method does not obey the read_preference of this MotorCollection. To run mapReduce on a secondary use the inline_map_reduce() method instead.

See also

The MongoDB documentation on

mapreduce

Changed in version 1.2: Added session parameter.

coroutine options(session=None)

Get the options set on this collection.

Returns a dictionary of options and their values - see create_collection() for more information on the possible options. Returns an empty dictionary if the collection has not been created yet.

Parameters:
coroutine reindex(session=None, **kwargs)

DEPRECATED: Rebuild all indexes on this collection.

Deprecated. Use command() to run the reIndex command directly instead:

await db.command({"reIndex": "<collection_name>"})

Note

Starting in MongoDB 4.6, the reIndex command can only be run when connected to a standalone mongod.

Parameters:
  • session (optional): a MotorClientSession.
  • **kwargs (optional): optional arguments to the reIndex command (like maxTimeMS) can be passed as keyword arguments.

Warning

reindex blocks all other operations (indexes are built in the foreground) and will be slow for large collections.

Changed in version 2.2: Deprecated.

coroutine rename(new_name, session=None, **kwargs)

Rename this collection.

If operating in auth mode, client must be authorized as an admin to perform this operation. Raises TypeError if new_name is not an instance of basestring (str in python 3). Raises InvalidName if new_name is not a valid collection name.

Parameters:
  • new_name: new name for this collection
  • session (optional): a ClientSession.
  • **kwargs (optional): additional arguments to the rename command may be passed as keyword arguments to this helper method (i.e. dropTarget=True)

Note

The write_concern of this collection is automatically applied to this operation when using MongoDB >= 3.4.

coroutine replace_one(filter, replacement, upsert=False, bypass_document_validation=False, collation=None, hint=None, session=None)

Replace a single document matching the filter.

Say our collection has one document:

{'x': 1, '_id': ObjectId('54f4c5befba5220aa4d6dee7')}

Then to replace it with another:

async def_replace_x_with_y():
    result = await db.test.replace_one({'x': 1}, {'y': 1})
    print('matched %d, modified %d' %
        (result.matched_count, result.modified_count))

    print('collection:')
    async for doc in db.test.find():
        print(doc)

This prints:

matched 1, modified 1
collection:
{'y': 1, '_id': ObjectId('54f4c5befba5220aa4d6dee7')}

The upsert option can be used to insert a new document if a matching document does not exist:

async def_replace_or_upsert():
    result = await db.test.replace_one({'x': 1}, {'x': 1}, True)
    print('matched %d, modified %d, upserted_id %r' %
        (result.matched_count, result.modified_count, result.upserted_id))

    print('collection:')
    async for doc in db.test.find():
        print(doc)

This prints:

matched 1, modified 1, upserted_id ObjectId('54f11e5c8891e756a6e1abd4')
collection:
{'y': 1, '_id': ObjectId('54f4c5befba5220aa4d6dee7')}
Parameters:
  • filter: A query that matches the document to replace.
  • replacement: The new document.
  • upsert (optional): If True, perform an insert if no documents match the filter.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.2 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added session parameter.

coroutine update_many(filter, update, upsert=False, array_filters=None, bypass_document_validation=False, collation=None, hint=None, session=None)

Update one or more documents that match the filter.

Say our collection has 3 documents:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

We can add 3 to each “x” field:

async def add_3_to_x():
  result = await db.test.update_many({'x': 1}, {'$inc': {'x': 3}})
  print('matched %d, modified %d' % 
        (result.matched_count, result.modified_count))

  print('collection:')
  async for doc in db.test.find():
      print(doc)

This prints:

matched 3, modified 3
collection:
{'x': 4, '_id': 0}
{'x': 4, '_id': 1}
{'x': 4, '_id': 2}
Parameters:
  • filter: A query that matches the documents to update.
  • update: The modifications to apply.
  • upsert (optional): If True, perform an insert if no documents match the filter.
  • bypass_document_validation (optional): If True, allows the write to opt-out of document level validation. Default is False.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • array_filters (optional): A list of filters specifying which array elements an update should apply. Requires MongoDB 3.6+.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.2 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added array_filters and session parameters.

coroutine update_one(filter, update, upsert=False, bypass_document_validation=False, collation=None, array_filters=None, hint=None, session=None)

Update a single document matching the filter.

Say our collection has 3 documents:

{'x': 1, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}

We can add 3 to the “x” field of one of the documents:

async def add_3_to_x():
  result = await db.test.update_one({'x': 1}, {'$inc': {'x': 3}})
  print('matched %d, modified %d' %
        (result.matched_count, result.modified_count))

  print('collection:')
  async for doc in db.test.find():
      print(doc)

This prints:

matched 1, modified 1
collection:
{'x': 4, '_id': 0}
{'x': 1, '_id': 1}
{'x': 1, '_id': 2}
Parameters:
  • filter: A query that matches the document to update.
  • update: The modifications to apply.
  • upsert (optional): If True, perform an insert if no documents match the filter.
  • bypass_document_validation: (optional) If True, allows the write to opt-out of document level validation. Default is False.
  • collation (optional): An instance of Collation. This option is only supported on MongoDB 3.4 and above.
  • array_filters (optional): A list of filters specifying which array elements an update should apply. Requires MongoDB 3.6+.
  • hint (optional): An index to use to support the query predicate specified either by its string name, or in the same format as passed to create_index() (e.g. [('field', ASCENDING)]). This option is only supported on MongoDB 4.2 and above.
  • session (optional): a ClientSession, created with start_session().
Returns:

Note

bypass_document_validation requires server version >= 3.2

Changed in version 2.2: Added hint parameter.

Changed in version 1.2: Added array_filters and session parameters.

watch(pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None)

Watch changes on this collection.

Performs an aggregation with an implicit initial $changeStream stage and returns a MotorChangeStream cursor which iterates over changes on this collection.

Introduced in MongoDB 3.6.

A change stream continues waiting indefinitely for matching change events. Code like the following allows a program to cancel the change stream and exit.

change_stream = None

async def watch_collection():
    global change_stream

    # Using the change stream in an "async with" block
    # ensures it is canceled promptly if your code breaks
    # from the loop or throws an exception.
    async with db.collection.watch() as change_stream:
        async for change in change_stream:
            print(change)

# Tornado
from tornado.ioloop import IOLoop

def main():
    loop = IOLoop.current()
    # Start watching collection for changes.
    loop.add_callback(watch_collection)
    try:
        loop.start()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()

# asyncio
from asyncio import get_event_loop

def main():
    loop = get_event_loop()
    task = loop.create_task(watch_collection)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()

        # Prevent "Task was destroyed but it is pending!"
        loop.run_until_complete(task)

The MotorChangeStream async iterable blocks until the next change document is returned or an error is raised. If the next() method encounters a network error when retrieving a batch from the server, it will automatically attempt to recreate the cursor such that no change events are missed. Any error encountered during the resume attempt indicates there may be an outage and will be raised.

try:
    pipeline = [{'$match': {'operationType': 'insert'}}]
    async with db.collection.watch(pipeline) as stream:
        async for change in stream:
            print(change)
except pymongo.errors.PyMongoError:
    # The ChangeStream encountered an unrecoverable error or the
    # resume attempt failed to recreate the cursor.
    logging.error('...')

For a precise description of the resume process see the change streams specification.

Parameters:
  • pipeline (optional): A list of aggregation pipeline stages to append to an initial $changeStream stage. Not all pipeline stages are valid after a $changeStream stage, see the MongoDB documentation on change streams for the supported stages.
  • full_document (optional): The fullDocument option to pass to the $changeStream stage. Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  • resume_after (optional): A resume token. If provided, the change stream will start returning changes that occur directly after the operation specified in the resume token. A resume token is the _id value of a change document.
  • max_await_time_ms (optional): The maximum time in milliseconds for the server to wait for changes before responding to a getMore operation.
  • batch_size (optional): The maximum number of documents to return per batch.
  • collation (optional): The Collation to use for the aggregation.
  • session (optional): a ClientSession.
  • start_after (optional): The same as resume_after except that start_after can resume notifications after an invalidate event. This option and resume_after are mutually exclusive.
Returns:

A MotorChangeStream.

See the Tornado Change Stream Example.

Changed in version 2.1: Added the start_after parameter.

New in version 1.2.

See also

The MongoDB documentation on

changeStreams

with_options(codec_options=None, read_preference=None, write_concern=None, read_concern=None)

Get a clone of this collection changing the specified settings.

>>> coll1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> coll2 = coll1.with_options(read_preference=ReadPreference.SECONDARY)
>>> coll1.read_preference
Primary()
>>> coll2.read_preference
Secondary(tag_sets=None)
Parameters:
  • codec_options (optional): An instance of CodecOptions. If None (the default) the codec_options of this Collection is used.
  • read_preference (optional): The read preference to use. If None (the default) the read_preference of this Collection is used. See read_preferences for options.
  • write_concern (optional): An instance of WriteConcern. If None (the default) the write_concern of this Collection is used.
  • read_concern (optional): An instance of ReadConcern. If None (the default) the read_concern of this Collection is used.
codec_options

Read only access to the CodecOptions of this instance.

full_name

The full name of this Collection.

The full name is of the form database_name.collection_name.

name

The name of this Collection.

read_concern

Read only access to the ReadConcern of this instance.

New in version 3.2.

read_preference

Read only access to the read preference of this instance.

Changed in version 3.0: The read_preference attribute is now read only.

write_concern

Read only access to the WriteConcern of this instance.

Changed in version 3.0: The write_concern attribute is now read only.

AsyncIOMotorChangeStream

class motor.motor_asyncio.AsyncIOMotorChangeStream(target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after)
close()

Close this change stream.

Stops any “async for” loops using this change stream.

next()

Advance the cursor.

This method blocks until the next change document is returned or an unrecoverable error is raised. This method is used when iterating over all changes in the cursor. For example:

async def watch_collection():
    resume_token = None
    pipeline = [{'$match': {'operationType': 'insert'}}]
    try:
        async with db.collection.watch(pipeline) as stream:
            async for insert_change in stream:
                print(insert_change)
                resume_token = stream.resume_token
    except pymongo.errors.PyMongoError:
        # The ChangeStream encountered an unrecoverable error or the
        # resume attempt failed to recreate the cursor.
        if resume_token is None:
            # There is no usable resume token because there was a
            # failure during ChangeStream initialization.
            logging.error('...')
        else:
            # Use the interrupted ChangeStream's resume token to
            # create a new ChangeStream. The new stream will
            # continue from the last seen insert change without
            # missing any events.
            async with db.collection.watch(
                    pipeline, resume_after=resume_token) as stream:
                async for insert_change in stream:
                    print(insert_change)

Raises StopAsyncIteration if this change stream is closed.

In addition to using an “async for” loop as shown in the code example above, you can also iterate the change stream by calling await change_stream.next() repeatedly.

try_next()

Advance the cursor without blocking indefinitely.

This method returns the next change document without waiting indefinitely for the next change. If no changes are available, it returns None. For example:

while change_stream.alive:
    change = await change_stream.try_next()
    # Note that the ChangeStream's resume token may be updated
    # even when no changes are returned.
    print("Current resume token: %r" % (change_stream.resume_token,))
    if change is not None:
        print("Change document: %r" % (change,))
        continue
    # We end up here when there are no recent changes.
    # Sleep for a while before trying again to avoid flooding
    # the server with getMore requests when no changes are
    # available.
    await asyncio.sleep(10)

If no change document is cached locally then this method runs a single getMore command. If the getMore yields any documents, the next document is returned, otherwise, if the getMore returns no documents (because there have been no changes) then None is returned.

Returns:The next change document or None when no document is available after running a single getMore or when the cursor is closed.

New in version 2.1.

alive

Does this cursor have the potential to return more data?

Note

Even if alive is True, next() can raise StopAsyncIteration and try_next() can return None.

resume_token

The cached resume token that will be used to resume after the most recently returned change.

New in version 3.9.

AsyncIOMotorCursor

class motor.motor_asyncio.AsyncIOMotorCursor(cursor, collection)

Don’t construct a cursor yourself, but acquire one from methods like MotorCollection.find() or MotorCollection.aggregate().

Note

There is no need to manually close cursors; they are closed by the server after being fully iterated with to_list(), each(), or async for, or automatically closed by the client when the MotorCursor is cleaned up by the garbage collector.

add_option(mask)

Set arbitrary query flags using a bitmask.

To set the tailable flag: cursor.add_option(2)

allow_disk_use(allow_disk_use)

Specifies whether MongoDB can use temporary disk files while processing a blocking sort operation.

Raises TypeError if allow_disk_use is not a boolean.

Note

allow_disk_use requires server version >= 4.4

Parameters:
  • allow_disk_use: if True, MongoDB may use temporary disk files to store data exceeding the system memory limit while processing a blocking sort operation.
clone()

Get a clone of this cursor.

close()

Explicitly kill this cursor on the server.

Call like:

await cursor.close()
collation(collation)

Adds a Collation to this query.

This option is only supported on MongoDB 3.4 and above.

Raises TypeError if collation is not an instance of Collation or a dict. Raises InvalidOperation if this Cursor has already been used. Only the last collation applied to this cursor has any effect.

Parameters:
comment(comment)

Adds a ‘comment’ to the cursor.

http://docs.mongodb.org/manual/reference/operator/comment/

Parameters:
  • comment: A string to attach to the query to help interpret and trace the operation in the server logs and in profile data.
coroutine distinct(key)

Get a list of distinct values for key among all documents in the result set of this query.

Raises TypeError if key is not an instance of basestring (str in python 3).

The distinct() method obeys the read_preference of the Collection instance on which find() was called.

Parameters:
  • key: name of key for which we want to get the distinct values
each(callback)

Iterates over all the documents for this cursor.

each() returns immediately, and callback is executed asynchronously for each document. callback is passed (None, None) when iteration is complete.

Cancel iteration early by returning False from the callback. (Only False cancels iteration: returning None or 0 does not.)

>>> def each(result, error):
...     if error:
...         raise error
...     elif result:
...         sys.stdout.write(str(result['_id']) + ', ')
...     else:
...         # Iteration complete
...         IOLoop.current().stop()
...         print('done')
...
>>> cursor = collection.find().sort([('_id', 1)])
>>> cursor.each(callback=each)
>>> IOLoop.current().start()
0, 1, 2, 3, 4, done

Note

Unlike other Motor methods, each requires a callback and does not return a Future, so it cannot be used in a coroutine. async for and to_list() are much easier to use.

Parameters:
  • callback: function taking (document, error)
coroutine explain()

Returns an explain plan record for this cursor.

Note

Starting with MongoDB 3.2 explain() uses the default verbosity mode of the explain command, allPlansExecution. To use a different verbosity use command() to run the explain command directly.

See also

The MongoDB documentation on

explain

hint(index)

Adds a ‘hint’, telling Mongo the proper index to use for the query.

Judicious use of hints can greatly improve query performance. When doing a query on multiple fields (at least one of which is indexed) pass the indexed field as a hint to the query. Raises OperationFailure if the provided hint requires an index that does not exist on this collection, and raises InvalidOperation if this cursor has already been used.

index should be an index as passed to create_index() (e.g. [('field', ASCENDING)]) or the name of the index. If index is None any existing hint for this query is cleared. The last hint applied to this cursor takes precedence over all others.

Parameters:
  • index: index to hint on (as an index specifier)
limit(limit)

Limits the number of results to be returned by this cursor.

Raises TypeError if limit is not an integer. Raises InvalidOperation if this Cursor has already been used. The last limit applied to this cursor takes precedence. A limit of 0 is equivalent to no limit.

Parameters:
  • limit: the number of results to return

See also

The MongoDB documentation on

limit

max(spec)

Adds max operator that specifies upper bound for specific index.

When using max, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the exclusive upper bound for all keys of a specific index in order.
max_await_time_ms(max_await_time_ms)

Specifies a time limit for a getMore operation on a TAILABLE_AWAIT cursor. For all other types of cursor max_await_time_ms is ignored.

Raises TypeError if max_await_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Note

max_await_time_ms requires server version >= 3.2

Parameters:
  • max_await_time_ms: the time limit after which the operation is aborted
max_scan(max_scan)

DEPRECATED - Limit the number of documents to scan when performing the query.

Raises InvalidOperation if this cursor has already been used. Only the last max_scan() applied to this cursor has any effect.

Parameters:
  • max_scan: the maximum number of documents to scan
max_time_ms(max_time_ms)

Specifies a time limit for a query operation. If the specified time is exceeded, the operation will be aborted and ExecutionTimeout is raised. If max_time_ms is None no limit is applied.

Raises TypeError if max_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Parameters:
  • max_time_ms: the time limit after which the operation is aborted
min(spec)

Adds min operator that specifies lower bound for specific index.

When using min, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the inclusive lower bound for all keys of a specific index in order.
next()

Advance the cursor.

New in version 2.2.

next_object()

DEPRECATED - Get a document from the most recently fetched batch, or None. See fetch_next.

The next_object() method is deprecated and will be removed in Motor 3.0. Use async for to elegantly iterate over MotorCursor objects instead.

Changed in version 2.2: Deprecated.

remove_option(mask)

Unset arbitrary query flags using a bitmask.

To unset the tailable flag: cursor.remove_option(2)

rewind()

Rewind this cursor to its unevaluated state.

skip(skip)

Skips the first skip results of this cursor.

Raises TypeError if skip is not an integer. Raises ValueError if skip is less than 0. Raises InvalidOperation if this Cursor has already been used. The last skip applied to this cursor takes precedence.

Parameters:
  • skip: the number of results to skip
sort(key_or_list, direction=None)

Sorts this cursor’s results.

Pass a field name and a direction, either ASCENDING or DESCENDING:

>>> async def f():
...     cursor = collection.find().sort('_id', pymongo.DESCENDING)
...     docs = await cursor.to_list(None)
...     print([d['_id'] for d in docs])
...
>>> IOLoop.current().run_sync(f)
[4, 3, 2, 1, 0]

To sort by multiple fields, pass a list of (key, direction) pairs:

>>> async def f():
...     cursor = collection.find().sort([
...         ('field1', pymongo.ASCENDING),
...         ('field2', pymongo.DESCENDING)])
...
...     docs = await cursor.to_list(None)
...     print([(d['field1'], d['field2']) for d in docs])
...
>>> IOLoop.current().run_sync(f)
[(0, 4), (0, 2), (0, 0), (1, 3), (1, 1)]

Text search results can be sorted by relevance:

>>> async def f():
...     cursor = collection.find({
...         '$text': {'$search': 'some words'}},
...         {'score': {'$meta': 'textScore'}})
...
...     # Sort by 'score' field.
...     cursor.sort([('score', {'$meta': 'textScore'})])
...     async for doc in cursor:
...         print('%.1f %s' % (doc['score'], doc['field']))
...
>>> IOLoop.current().run_sync(f)
1.5 words about some words
1.0 words

Raises InvalidOperation if this cursor has already been used. Only the last sort() applied to this cursor has any effect.

Parameters:
  • key_or_list: a single key or a list of (key, direction) pairs specifying the keys to sort on
  • direction (optional): only used if key_or_list is a single key, if not given ASCENDING is assumed
coroutine to_list(length)

Get a list of documents.

>>> from motor.motor_tornado import MotorClient
>>> collection = MotorClient().test.test_collection
>>>
>>> async def f():
...     cursor = collection.find().sort([('_id', 1)])
...     docs = await cursor.to_list(length=2)
...     while docs:
...         print(docs)
...         docs = await cursor.to_list(length=2)
...
...     print('done')
...
>>> ioloop.IOLoop.current().run_sync(f)
[{'_id': 0}, {'_id': 1}]
[{'_id': 2}, {'_id': 3}]
done
Parameters:
  • length: maximum number of documents to return for this call, or None

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: callback must be passed as a keyword argument, like to_list(10, callback=callback), and the length parameter is no longer optional.

where(code)

Adds a $where clause to this query.

The code argument must be an instance of str Code containing a JavaScript expression. This expression will be evaluated for each document scanned. Only those documents for which the expression evaluates to true will be returned as results. The keyword this refers to the object currently being scanned. For example:

# Find all documents where field "a" is less than "b" plus "c".
async for doc in db.test.find().where('this.a < (this.b + this.c)'):
    print(doc)

Raises TypeError if code is not an instance of str. Raises InvalidOperation if this MotorCursor has already been used. Only the last call to where() applied to a MotorCursor has any effect.

Note

MongoDB 4.4 drops support for Code with scope variables. Consider using $expr instead.

Parameters:
  • code: JavaScript expression to use as a filter
address

The (host, port) of the server used, or None.

Changed in version 3.0: Renamed from “conn_id”.

alive

Does this cursor have the potential to return more data?

This is mostly useful with tailable cursors since they will stop iterating even though they may return more results in the future.

With regular cursors, simply use a for loop instead of alive:

for doc in collection.find():
    print(doc)

Note

Even if alive is True, next() can raise StopIteration. alive can also be True while iterating a cursor from a failed server. In this case alive will return False after next() fails to retrieve the next batch of results from the server.

cursor_id

Returns the id of the cursor

Useful if you need to manage cursor ids and want to handle killing cursors manually using kill_cursors()

New in version 2.2.

fetch_next

DEPRECATED - A Future used with gen.coroutine to asynchronously retrieve the next document in the result set, fetching a batch of documents from the server if necessary. Resolves to False if there are no more documents, otherwise next_object() is guaranteed to return a document:

Attention

The fetch_next property is deprecated and will be removed in Motor 3.0. Use async for to iterate elegantly and efficiently over MotorCursor objects instead.:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     async for doc in collection.find():
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

While it appears that fetch_next retrieves each document from the server individually, the cursor actually fetches documents efficiently in large batches. Example usage:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     cursor = collection.find().sort([('_id', 1)])
...     while (await cursor.fetch_next):
...         doc = cursor.next_object()
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

Changed in version 2.2: Deprecated.

session

The cursor’s ClientSession, or None.

New in version 3.6.

AsyncIOMotorCommandCursor

class motor.motor_asyncio.AsyncIOMotorCommandCursor(cursor, collection)

Don’t construct a cursor yourself, but acquire one from methods like MotorCollection.find() or MotorCollection.aggregate().

Note

There is no need to manually close cursors; they are closed by the server after being fully iterated with to_list(), each(), or async for, or automatically closed by the client when the MotorCursor is cleaned up by the garbage collector.

close()

Explicitly kill this cursor on the server.

Call like:

await cursor.close()
each(callback)

Iterates over all the documents for this cursor.

each() returns immediately, and callback is executed asynchronously for each document. callback is passed (None, None) when iteration is complete.

Cancel iteration early by returning False from the callback. (Only False cancels iteration: returning None or 0 does not.)

>>> def each(result, error):
...     if error:
...         raise error
...     elif result:
...         sys.stdout.write(str(result['_id']) + ', ')
...     else:
...         # Iteration complete
...         IOLoop.current().stop()
...         print('done')
...
>>> cursor = collection.find().sort([('_id', 1)])
>>> cursor.each(callback=each)
>>> IOLoop.current().start()
0, 1, 2, 3, 4, done

Note

Unlike other Motor methods, each requires a callback and does not return a Future, so it cannot be used in a coroutine. async for and to_list() are much easier to use.

Parameters:
  • callback: function taking (document, error)
next()

Advance the cursor.

New in version 2.2.

next_object()

DEPRECATED - Get a document from the most recently fetched batch, or None. See fetch_next.

The next_object() method is deprecated and will be removed in Motor 3.0. Use async for to elegantly iterate over MotorCursor objects instead.

Changed in version 2.2: Deprecated.

coroutine to_list(length)

Get a list of documents.

>>> from motor.motor_tornado import MotorClient
>>> collection = MotorClient().test.test_collection
>>>
>>> async def f():
...     cursor = collection.find().sort([('_id', 1)])
...     docs = await cursor.to_list(length=2)
...     while docs:
...         print(docs)
...         docs = await cursor.to_list(length=2)
...
...     print('done')
...
>>> ioloop.IOLoop.current().run_sync(f)
[{'_id': 0}, {'_id': 1}]
[{'_id': 2}, {'_id': 3}]
done
Parameters:
  • length: maximum number of documents to return for this call, or None

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: callback must be passed as a keyword argument, like to_list(10, callback=callback), and the length parameter is no longer optional.

address

The (host, port) of the server used, or None.

New in version 3.0.

alive

Does this cursor have the potential to return more data?

Even if alive is True, next() can raise StopIteration. Best to use a for loop:

for doc in collection.aggregate(pipeline):
    print(doc)

Note

alive can be True while iterating a cursor from a failed server. In this case alive will return False after next() fails to retrieve the next batch of results from the server.

cursor_id

Returns the id of the cursor.

fetch_next

DEPRECATED - A Future used with gen.coroutine to asynchronously retrieve the next document in the result set, fetching a batch of documents from the server if necessary. Resolves to False if there are no more documents, otherwise next_object() is guaranteed to return a document:

Attention

The fetch_next property is deprecated and will be removed in Motor 3.0. Use async for to iterate elegantly and efficiently over MotorCursor objects instead.:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     async for doc in collection.find():
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

While it appears that fetch_next retrieves each document from the server individually, the cursor actually fetches documents efficiently in large batches. Example usage:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     cursor = collection.find().sort([('_id', 1)])
...     while (await cursor.fetch_next):
...         doc = cursor.next_object()
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

Changed in version 2.2: Deprecated.

session

The cursor’s ClientSession, or None.

New in version 3.6.

asyncio GridFS Classes

Store blobs of data in GridFS.

class motor.motor_asyncio.AsyncIOMotorGridFSBucket

Create a new instance of AsyncIOMotorGridFSBucket.

Raises TypeError if database is not an instance of AsyncIOMotorDatabase.

Raises ConfigurationError if write_concern is not acknowledged.

Parameters:
  • database: database to use.
  • bucket_name (optional): The name of the bucket. Defaults to ‘fs’.
  • chunk_size_bytes (optional): The chunk size in bytes. Defaults to 255KB.
  • write_concern (optional): The WriteConcern to use. If None (the default) db.write_concern is used.
  • read_preference (optional): The read preference to use. If None (the default) db.read_preference is used.

See also

The MongoDB documentation on

gridfs

coroutine delete(self, file_id)

Delete a file’s metadata and data chunks from a GridFS bucket:

async def delete():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    # Get _id of file to delete
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    await fs.delete(file_id)

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be deleted.
coroutine download_to_stream(self, file_id, destination)

Downloads the contents of the stored file specified by file_id and writes the contents to destination:

async def download():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    # Get _id of file to read
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    # Get file to write to
    file = open('myfile','wb+')
    await fs.download_to_stream(file_id, file)
    file.seek(0)
    contents = file.read()

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be downloaded.
  • destination: a file-like object implementing write().
coroutine download_to_stream_by_name(self, filename, destination, revision=-1)

Write the contents of filename (with optional revision) to destination.

For example:

async def download_by_name():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    # Get file to write to
    file = open('myfile','wb')
    await fs.download_to_stream_by_name("test_file", file)

Raises NoFile if no such version of that file exists.

Raises ValueError if filename is not a string.

Parameters:
  • filename: The name of the file to read from.
  • destination: A file-like object that implements write().
  • revision (optional): Which revision (documents with the same filename and different uploadDate) of the file to retrieve. Defaults to -1 (the most recent revision).
Note:

Revision numbers are defined as follows:

  • 0 = the original stored file
  • 1 = the first revision
  • 2 = the second revision
  • etc…
  • -2 = the second most recent revision
  • -1 = the most recent revision
find(self, *args, **kwargs)

Find and return the files collection documents that match filter.

Returns a cursor that iterates across files matching arbitrary queries on the files collection. Can be combined with other modifiers for additional control.

For example:

async def find():
    cursor = fs.find({"filename": "lisa.txt"},
                     no_cursor_timeout=True)

    async for grid_data in cursor:
        data = grid_data.read()

iterates through all versions of “lisa.txt” stored in GridFS. Setting no_cursor_timeout may be important to prevent the cursor from timing out during long multi-file processing work.

As another example, the call:

most_recent_three = fs.find().sort("uploadDate", -1).limit(3)

returns a cursor to the three most recently uploaded files in GridFS.

Follows a similar interface to find() in AsyncIOMotorCollection.

Parameters:
  • filter: Search query.
  • batch_size (optional): The number of documents to return per batch.
  • limit (optional): The maximum number of documents to return.
  • no_cursor_timeout (optional): The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to True prevent that.
  • skip (optional): The number of documents to skip before returning.
  • sort (optional): The order by which to sort results. Defaults to None.
coroutine open_download_stream(self, file_id)

Opens a stream to read the contents of the stored file specified by file_id:

async def download_stream():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    # get _id of file to read.
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    grid_out = await fs.open_download_stream(file_id)
    contents = await grid_out.read()

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be downloaded.

Returns a AsyncIOMotorGridOut.

coroutine open_download_stream_by_name(self, filename, revision=-1)

Opens a stream to read the contents of filename and optional revision:

async def download_by_name():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    # get _id of file to read.
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")
    grid_out = await fs.open_download_stream_by_name(file_id)
    contents = await grid_out.read()

Raises NoFile if no such version of that file exists.

Raises ValueError filename is not a string.

Parameters:
  • filename: The name of the file to read from.
  • revision (optional): Which revision (documents with the same filename and different uploadDate) of the file to retrieve. Defaults to -1 (the most recent revision).

Returns a AsyncIOMotorGridOut.

Note:

Revision numbers are defined as follows:

  • 0 = the original stored file
  • 1 = the first revision
  • 2 = the second revision
  • etc…
  • -2 = the second most recent revision
  • -1 = the most recent revision
open_upload_stream(self, filename, chunk_size_bytes=None, metadata=None)

Opens a stream for writing.

Specify the filename, and add any additional information in the metadata field of the file document or modify the chunk size:

async def upload():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    grid_in, file_id = fs.open_upload_stream(
        "test_file", chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

    await grid_in.write(b"data I want to store!")
    await grid_in.close()  # uploaded on close

Returns an instance of AsyncIOMotorGridIn.

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

In a Python 3.5 native coroutine, the “async with” statement calls close() automatically:

async def upload():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    async with await fs.new_file() as gridin:
        await gridin.write(b'First part\n')
        await gridin.write(b'Second part')

    # gridin is now closed automatically.
Parameters:
  • filename: The name of the file to upload.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes in AsyncIOMotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.
open_upload_stream_with_id(self, file_id, filename, chunk_size_bytes=None, metadata=None)

Opens a stream for writing.

Specify the filed_id and filename, and add any additional information in the metadata field of the file document, or modify the chunk size:

async def upload():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    grid_in, file_id = fs.open_upload_stream_with_id(
        ObjectId(),
        "test_file",
        chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

    await grid_in.write(b"data I want to store!")
    await grid_in.close()  # uploaded on close

Returns an instance of AsyncIOMotorGridIn.

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Parameters:
  • file_id: The id to use for this file. The id must not have already been used for another file.
  • filename: The name of the file to upload.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes in AsyncIOMotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.
coroutine rename(self, file_id, new_filename)

Renames the stored file with the specified file_id.

For example:

async def rename():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    # get _id of file to read.
    file_id = await fs.upload_from_stream("test_file",
                                          b"data I want to store!")

    await fs.rename(file_id, "new_test_name")

Raises NoFile if no file with file_id exists.

Parameters:
  • file_id: The _id of the file to be renamed.
  • new_filename: The new name of the file.
coroutine upload_from_stream(self, filename, source, chunk_size_bytes=None, metadata=None)

Uploads a user file to a GridFS bucket.

Reads the contents of the user file from source and uploads it to the file filename. Source can be a string or file-like object. For example:

async def upload_from_stream():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    file_id = await fs.upload_from_stream(
        "test_file",
        b"data I want to store!",
        chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Parameters:
  • filename: The name of the file to upload.
  • source: The source stream of the content to be uploaded. Must be a file-like object that implements read() or a string.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes of AsyncIOMotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.

Returns the _id of the uploaded file.

coroutine upload_from_stream_with_id(self, file_id, filename, source, chunk_size_bytes=None, metadata=None)

Uploads a user file to a GridFS bucket with a custom file id.

Reads the contents of the user file from source and uploads it to the file filename. Source can be a string or file-like object. For example:

async def upload_from_stream_with_id():
    my_db = AsyncIOMotorClient().test
    fs = AsyncIOMotorGridFSBucket(my_db)
    file_id = await fs.upload_from_stream_with_id(
        ObjectId(),
        "test_file",
        b"data I want to store!",
        chunk_size_bytes=4,
        metadata={"contentType": "text/plain"})

Raises NoFile if no such version of that file exists. Raises ValueError if filename is not a string.

Parameters:
  • file_id: The id to use for this file. The id must not have already been used for another file.
  • filename: The name of the file to upload.
  • source: The source stream of the content to be uploaded. Must be a file-like object that implements read() or a string.
  • chunk_size_bytes (options): The number of bytes per chunk of this file. Defaults to the chunk_size_bytes of AsyncIOMotorGridFSBucket.
  • metadata (optional): User data for the ‘metadata’ field of the files collection document. If not provided the metadata field will be omitted from the files collection document.
class motor.motor_asyncio.AsyncIOMotorGridIn(root_collection, delegate=None, session=None, disable_md5=False, **kwargs)

Class to write data to GridFS. Application developers should not generally need to instantiate this class - see open_upload_stream().

Any of the file level options specified in the GridFS Spec may be passed as keyword arguments. Any additional keyword arguments will be set as additional fields on the file document. Valid keyword arguments include:

  • "_id": unique ID for this file (default: ObjectId) - this "_id" must not have already been used for another file
  • "filename": human name for the file
  • "contentType" or "content_type": valid mime-type for the file
  • "chunkSize" or "chunk_size": size of each of the chunks, in bytes (default: 256 kb)
  • "encoding": encoding used for this file. In Python 2, any unicode that is written to the file will be converted to a str. In Python 3, any str that is written to the file will be converted to bytes.
Parameters:
  • root_collection: root collection to write to
  • session (optional): a ClientSession to use for all commands
  • disable_md5 (optional): When True, an MD5 checksum will not be computed for the uploaded file. Useful in environments where MD5 cannot be used for regulatory or other reasons. Defaults to False.
  • **kwargs (optional): file level options (see above)

Changed in version 0.2: open method removed, no longer needed.

coroutine abort()

Remove all chunks/files that may have been uploaded and close.

coroutine close()

Flush the file and close it.

A closed file cannot be written any more. Calling close() more than once is allowed.

coroutine set(name, value)

Set an arbitrary metadata attribute on the file. Stores value on the server as a key-value pair within the file document once the file is closed. If the file is already closed, calling set() will immediately update the file document on the server.

Metadata set on the file appears as attributes on a MotorGridOut object created from the file.

Parameters:
  • name: Name of the attribute, will be stored as a key in the file document on the server
  • value: Value of the attribute
coroutine write(data)

Write data to the file. There is no return value.

data can be either a string of bytes or a file-like object (implementing read()). If the file has an encoding attribute, data can also be a unicode (str in python 3) instance, which will be encoded as encoding before being written.

Due to buffering, the data may not actually be written to the database until the close() method is called. Raises ValueError if this file is already closed. Raises TypeError if data is not an instance of str (bytes in python 3), a file-like object, or an instance of unicode (str in python 3). Unicode data is only allowed if the file has an encoding attribute.

Parameters:
  • data: string of bytes or file-like object to be written to the file
coroutine writelines(sequence)

Write a sequence of strings to the file.

Does not add seperators.

chunk_size

Chunk size for this file.

This attribute is read-only.

closed

Is this file closed?

content_type

Mime-type for this file.

filename

Name of this file.

length

Length (in bytes) of this file.

This attribute is read-only and can only be read after close() has been called.

md5

MD5 of the contents of this file if an md5 sum was created.

This attribute is read-only and can only be read after close() has been called.

name

Alias for filename.

upload_date

Date that this file was uploaded.

This attribute is read-only and can only be read after close() has been called.

class motor.motor_asyncio.AsyncIOMotorGridOut(root_collection, file_id=None, file_document=None, delegate=None, session=None)
coroutine open()

Retrieve this file’s attributes from the server.

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: MotorGridOut now opens itself on demand, calling open explicitly is rarely needed.

coroutine read(size=-1)

Read at most size bytes from the file (less if there isn’t enough data).

The bytes are returned as an instance of str (bytes in python 3). If size is negative or omitted all data is read.

Parameters:
  • size (optional): the number of bytes to read
coroutine readchunk()

Reads a chunk at a time. If the current position is within a chunk the remainder of the chunk is returned.

coroutine readline(size=-1)

Read one line or up to size bytes from the file.

Parameters:
  • size (optional): the maximum number of bytes to read
stream_to_handler(request_handler)

Write the contents of this file to a tornado.web.RequestHandler. This method calls flush() on the RequestHandler, so ensure all headers have already been set. For a more complete example see the implementation of GridFSHandler.

class FileHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @gen.coroutine
    def get(self, filename):
        db = self.settings['db']
        fs = await motor.MotorGridFSBucket(db())
        try:
            gridout = await fs.open_download_stream_by_name(filename)
        except gridfs.NoFile:
            raise tornado.web.HTTPError(404)

        self.set_header("Content-Type", gridout.content_type)
        self.set_header("Content-Length", gridout.length)
        await gridout.stream_to_handler(self)
        self.finish()

See also

Tornado RequestHandler

aliases

List of aliases for this file.

This attribute is read-only.

chunk_size

Chunk size for this file.

This attribute is read-only.

close

Make GridOut more generically file-like.

content_type

Mime-type for this file.

This attribute is read-only.

filename

Name of this file.

This attribute is read-only.

length

Length (in bytes) of this file.

This attribute is read-only.

md5

MD5 of the contents of this file if an md5 sum was created.

This attribute is read-only.

metadata

Metadata attached to this file.

This attribute is read-only.

name

Alias for filename.

This attribute is read-only.

seek

Set the current position of this file.

Parameters:
  • pos: the position (or offset if using relative positioning) to seek to
  • whence (optional): where to seek from. os.SEEK_SET (0) for absolute file positioning, os.SEEK_CUR (1) to seek relative to the current position, os.SEEK_END (2) to seek relative to the file’s end.
tell

Return the current position of this file.

upload_date

Date that this file was first uploaded.

This attribute is read-only.

class motor.motor_asyncio.AsyncIOMotorGridOutCursor(cursor, collection)

Don’t construct a cursor yourself, but acquire one from methods like MotorCollection.find() or MotorCollection.aggregate().

Note

There is no need to manually close cursors; they are closed by the server after being fully iterated with to_list(), each(), or async for, or automatically closed by the client when the MotorCursor is cleaned up by the garbage collector.

allow_disk_use(allow_disk_use)

Specifies whether MongoDB can use temporary disk files while processing a blocking sort operation.

Raises TypeError if allow_disk_use is not a boolean.

Note

allow_disk_use requires server version >= 4.4

Parameters:
  • allow_disk_use: if True, MongoDB may use temporary disk files to store data exceeding the system memory limit while processing a blocking sort operation.
clone()

Get a clone of this cursor.

close()

Explicitly kill this cursor on the server.

Call like:

await cursor.close()
collation(collation)

Adds a Collation to this query.

This option is only supported on MongoDB 3.4 and above.

Raises TypeError if collation is not an instance of Collation or a dict. Raises InvalidOperation if this Cursor has already been used. Only the last collation applied to this cursor has any effect.

Parameters:
comment(comment)

Adds a ‘comment’ to the cursor.

http://docs.mongodb.org/manual/reference/operator/comment/

Parameters:
  • comment: A string to attach to the query to help interpret and trace the operation in the server logs and in profile data.
coroutine distinct(key)

Get a list of distinct values for key among all documents in the result set of this query.

Raises TypeError if key is not an instance of basestring (str in python 3).

The distinct() method obeys the read_preference of the Collection instance on which find() was called.

Parameters:
  • key: name of key for which we want to get the distinct values
each(callback)

Iterates over all the documents for this cursor.

each() returns immediately, and callback is executed asynchronously for each document. callback is passed (None, None) when iteration is complete.

Cancel iteration early by returning False from the callback. (Only False cancels iteration: returning None or 0 does not.)

>>> def each(result, error):
...     if error:
...         raise error
...     elif result:
...         sys.stdout.write(str(result['_id']) + ', ')
...     else:
...         # Iteration complete
...         IOLoop.current().stop()
...         print('done')
...
>>> cursor = collection.find().sort([('_id', 1)])
>>> cursor.each(callback=each)
>>> IOLoop.current().start()
0, 1, 2, 3, 4, done

Note

Unlike other Motor methods, each requires a callback and does not return a Future, so it cannot be used in a coroutine. async for and to_list() are much easier to use.

Parameters:
  • callback: function taking (document, error)
coroutine explain()

Returns an explain plan record for this cursor.

Note

Starting with MongoDB 3.2 explain() uses the default verbosity mode of the explain command, allPlansExecution. To use a different verbosity use command() to run the explain command directly.

See also

The MongoDB documentation on

explain

hint(index)

Adds a ‘hint’, telling Mongo the proper index to use for the query.

Judicious use of hints can greatly improve query performance. When doing a query on multiple fields (at least one of which is indexed) pass the indexed field as a hint to the query. Raises OperationFailure if the provided hint requires an index that does not exist on this collection, and raises InvalidOperation if this cursor has already been used.

index should be an index as passed to create_index() (e.g. [('field', ASCENDING)]) or the name of the index. If index is None any existing hint for this query is cleared. The last hint applied to this cursor takes precedence over all others.

Parameters:
  • index: index to hint on (as an index specifier)
limit(limit)

Limits the number of results to be returned by this cursor.

Raises TypeError if limit is not an integer. Raises InvalidOperation if this Cursor has already been used. The last limit applied to this cursor takes precedence. A limit of 0 is equivalent to no limit.

Parameters:
  • limit: the number of results to return

See also

The MongoDB documentation on

limit

max(spec)

Adds max operator that specifies upper bound for specific index.

When using max, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the exclusive upper bound for all keys of a specific index in order.
max_await_time_ms(max_await_time_ms)

Specifies a time limit for a getMore operation on a TAILABLE_AWAIT cursor. For all other types of cursor max_await_time_ms is ignored.

Raises TypeError if max_await_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Note

max_await_time_ms requires server version >= 3.2

Parameters:
  • max_await_time_ms: the time limit after which the operation is aborted
max_scan(max_scan)

DEPRECATED - Limit the number of documents to scan when performing the query.

Raises InvalidOperation if this cursor has already been used. Only the last max_scan() applied to this cursor has any effect.

Parameters:
  • max_scan: the maximum number of documents to scan
max_time_ms(max_time_ms)

Specifies a time limit for a query operation. If the specified time is exceeded, the operation will be aborted and ExecutionTimeout is raised. If max_time_ms is None no limit is applied.

Raises TypeError if max_time_ms is not an integer or None. Raises InvalidOperation if this Cursor has already been used.

Parameters:
  • max_time_ms: the time limit after which the operation is aborted
min(spec)

Adds min operator that specifies lower bound for specific index.

When using min, hint() should also be configured to ensure the query uses the expected index and starting in MongoDB 4.2 hint() will be required.

Parameters:
  • spec: a list of field, limit pairs specifying the inclusive lower bound for all keys of a specific index in order.
next()

Advance the cursor.

New in version 2.2.

next_object()

DEPRECATED - Get next GridOut object from cursor.

rewind()

Rewind this cursor to its unevaluated state.

skip(skip)

Skips the first skip results of this cursor.

Raises TypeError if skip is not an integer. Raises ValueError if skip is less than 0. Raises InvalidOperation if this Cursor has already been used. The last skip applied to this cursor takes precedence.

Parameters:
  • skip: the number of results to skip
sort(key_or_list, direction=None)

Sorts this cursor’s results.

Pass a field name and a direction, either ASCENDING or DESCENDING:

>>> async def f():
...     cursor = collection.find().sort('_id', pymongo.DESCENDING)
...     docs = await cursor.to_list(None)
...     print([d['_id'] for d in docs])
...
>>> IOLoop.current().run_sync(f)
[4, 3, 2, 1, 0]

To sort by multiple fields, pass a list of (key, direction) pairs:

>>> async def f():
...     cursor = collection.find().sort([
...         ('field1', pymongo.ASCENDING),
...         ('field2', pymongo.DESCENDING)])
...
...     docs = await cursor.to_list(None)
...     print([(d['field1'], d['field2']) for d in docs])
...
>>> IOLoop.current().run_sync(f)
[(0, 4), (0, 2), (0, 0), (1, 3), (1, 1)]

Text search results can be sorted by relevance:

>>> async def f():
...     cursor = collection.find({
...         '$text': {'$search': 'some words'}},
...         {'score': {'$meta': 'textScore'}})
...
...     # Sort by 'score' field.
...     cursor.sort([('score', {'$meta': 'textScore'})])
...     async for doc in cursor:
...         print('%.1f %s' % (doc['score'], doc['field']))
...
>>> IOLoop.current().run_sync(f)
1.5 words about some words
1.0 words

Raises InvalidOperation if this cursor has already been used. Only the last sort() applied to this cursor has any effect.

Parameters:
  • key_or_list: a single key or a list of (key, direction) pairs specifying the keys to sort on
  • direction (optional): only used if key_or_list is a single key, if not given ASCENDING is assumed
coroutine to_list(length)

Get a list of documents.

>>> from motor.motor_tornado import MotorClient
>>> collection = MotorClient().test.test_collection
>>>
>>> async def f():
...     cursor = collection.find().sort([('_id', 1)])
...     docs = await cursor.to_list(length=2)
...     while docs:
...         print(docs)
...         docs = await cursor.to_list(length=2)
...
...     print('done')
...
>>> ioloop.IOLoop.current().run_sync(f)
[{'_id': 0}, {'_id': 1}]
[{'_id': 2}, {'_id': 3}]
done
Parameters:
  • length: maximum number of documents to return for this call, or None

Returns a Future.

Changed in version 2.0: No longer accepts a callback argument.

Changed in version 0.2: callback must be passed as a keyword argument, like to_list(10, callback=callback), and the length parameter is no longer optional.

where(code)

Adds a $where clause to this query.

The code argument must be an instance of str Code containing a JavaScript expression. This expression will be evaluated for each document scanned. Only those documents for which the expression evaluates to true will be returned as results. The keyword this refers to the object currently being scanned. For example:

# Find all documents where field "a" is less than "b" plus "c".
async for doc in db.test.find().where('this.a < (this.b + this.c)'):
    print(doc)

Raises TypeError if code is not an instance of str. Raises InvalidOperation if this MotorCursor has already been used. Only the last call to where() applied to a MotorCursor has any effect.

Note

MongoDB 4.4 drops support for Code with scope variables. Consider using $expr instead.

Parameters:
  • code: JavaScript expression to use as a filter
address

The (host, port) of the server used, or None.

Changed in version 3.0: Renamed from “conn_id”.

alive

Does this cursor have the potential to return more data?

This is mostly useful with tailable cursors since they will stop iterating even though they may return more results in the future.

With regular cursors, simply use a for loop instead of alive:

for doc in collection.find():
    print(doc)

Note

Even if alive is True, next() can raise StopIteration. alive can also be True while iterating a cursor from a failed server. In this case alive will return False after next() fails to retrieve the next batch of results from the server.

cursor_id

Returns the id of the cursor

Useful if you need to manage cursor ids and want to handle killing cursors manually using kill_cursors()

New in version 2.2.

fetch_next

DEPRECATED - A Future used with gen.coroutine to asynchronously retrieve the next document in the result set, fetching a batch of documents from the server if necessary. Resolves to False if there are no more documents, otherwise next_object() is guaranteed to return a document:

Attention

The fetch_next property is deprecated and will be removed in Motor 3.0. Use async for to iterate elegantly and efficiently over MotorCursor objects instead.:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     async for doc in collection.find():
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

While it appears that fetch_next retrieves each document from the server individually, the cursor actually fetches documents efficiently in large batches. Example usage:

>>> async def f():
...     await collection.drop()
...     await collection.insert_many([{'_id': i} for i in range(5)])
...     cursor = collection.find().sort([('_id', 1)])
...     while (await cursor.fetch_next):
...         doc = cursor.next_object()
...         sys.stdout.write(str(doc['_id']) + ', ')
...     print('done')
...
>>> IOLoop.current().run_sync(f)
0, 1, 2, 3, 4, done

Changed in version 2.2: Deprecated.

session

The cursor’s ClientSession, or None.

New in version 3.6.

motor.aiohttp - Integrate Motor with the aiohttp web framework

Serve GridFS files with Motor and aiohttp.

Requires Python 3.5 or later and aiohttp 3.0 or later.

See the AIOHTTPGridFS Example.

class motor.aiohttp.AIOHTTPGridFS(database, root_collection='fs', get_gridfs_file=<function get_gridfs_file>, get_cache_time=<function get_cache_time>, set_extra_headers=<function set_extra_headers>)

Serve files from GridFS.

This class is a request handler that serves GridFS files, similar to aiohttp’s built-in static file server.

client = AsyncIOMotorClient()
gridfs_handler = AIOHTTPGridFS(client.my_database)

app = aiohttp.web.Application()

# The GridFS URL pattern must have a "{filename}" variable.
resource = app.router.add_resource('/fs/{filename}')
resource.add_route('GET', gridfs_handler)
resource.add_route('HEAD', gridfs_handler)

app_handler = app.make_handler()
server = loop.create_server(app_handler, port=80)

By default, requests’ If-Modified-Since headers are honored, but no specific cache-control timeout is sent to clients. Thus each request for a GridFS file requires a quick check of the file’s uploadDate in MongoDB. Pass a custom get_cache_time() to customize this.

Parameters:
motor.aiohttp.get_cache_time(filename, modified, mime_type)

Override to customize cache control behavior.

Return a positive number of seconds to trigger aggressive caching or 0 to mark resource as cacheable, only. 0 is the default.

For example, to allow image caching:

def image_cache_time(filename, modified, mime_type):
    if mime_type.startswith('image/'):
        return 3600

    return 0

client = AsyncIOMotorClient()
gridfs_handler = AIOHTTPGridFS(client.my_database,
                               get_cache_time=image_cache_time)
Parameters:
  • filename: A string, the URL portion matching {filename} in the URL pattern
  • modified: A datetime, when the matching GridFS file was created
  • mime_type: The file’s type, a string like “application/octet-stream”
motor.aiohttp.get_gridfs_file(bucket, filename, request)

Override to choose a GridFS file to serve at a URL.

By default, if a URL pattern like /fs/{filename} is mapped to this AIOHTTPGridFS, then the filename portion of the URL is used as the filename, so a request for “/fs/image.png” results in a call to AsyncIOMotorGridFSBucket.open_download_stream_by_name() with “image.png” as the filename argument. To customize the mapping of path to GridFS file, override get_gridfs_file and return a asyncio.Future that resolves to a AsyncIOMotorGridOut.

For example, to retrieve the file by _id instead of filename:

def get_gridfile_by_id(bucket, filename, request):
    # "filename" is interpreted as _id instead of name.
    # Return a Future AsyncIOMotorGridOut.
    return bucket.open_download_stream(file_id=filename)

client = AsyncIOMotorClient()
gridfs_handler = AIOHTTPGridFS(client.my_database,
                               get_gridfs_file=get_gridfile_by_id)
Parameters:
motor.aiohttp.set_extra_headers(response, gridout)

Override to modify the response before sending to client.

For example, to allow image caching:

def gzip_header(response, gridout):
    response.headers['Content-Encoding'] = 'gzip'

client = AsyncIOMotorClient()
gridfs_handler = AIOHTTPGridFS(client.my_database,
                               set_extra_headers=gzip_header)
Parameters:

This page describes using Motor with asyncio. For Tornado integration, see Motor Tornado API.

Logo by Musho Rodney Alan Greenblat