AsyncIOMotorChangeStream¶
Warning
As of May 14th, 2025, Motor is deprecated in favor of the GA release of the PyMongo Async API. No new features will be added to Motor, and only bug fixes will be provided until it reaches end of life on May 14th, 2026. After that, only critical bug fixes will be made until final support ends on May 14th, 2027. We strongly recommend migrating to the PyMongo Async API while Motor is still supported. For help transitioning, see the Migrate to PyMongo Async guide.
- class motor.motor_asyncio.AsyncIOMotorChangeStream(target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after, comment, full_document_before_change, show_expanded_events)¶
- async close()¶
Close this change stream.
Stops any “async for” loops using this change stream.
- async next()¶
Advance the cursor.
This method blocks until the next change document is returned or an unrecoverable error is raised. This method is used when iterating over all changes in the cursor. For example:
async def watch_collection(): resume_token = None pipeline = [{'$match': {'operationType': 'insert'}}] try: async with db.collection.watch(pipeline) as stream: async for insert_change in stream: print(insert_change) resume_token = stream.resume_token except pymongo.errors.PyMongoError: # The ChangeStream encountered an unrecoverable error or the # resume attempt failed to recreate the cursor. if resume_token is None: # There is no usable resume token because there was a # failure during ChangeStream initialization. logging.error('...') else: # Use the interrupted ChangeStream's resume token to # create a new ChangeStream. The new stream will # continue from the last seen insert change without # missing any events. async with db.collection.watch( pipeline, resume_after=resume_token) as stream: async for insert_change in stream: print(insert_change)
Raises
StopAsyncIterationif this change stream is closed.In addition to using an “async for” loop as shown in the code example above, you can also iterate the change stream by calling
await change_stream.next()repeatedly.
- async try_next()¶
Advance the cursor without blocking indefinitely.
This method returns the next change document without waiting indefinitely for the next change. If no changes are available, it returns None. For example:
while change_stream.alive: change = await change_stream.try_next() # Note that the ChangeStream's resume token may be updated # even when no changes are returned. print("Current resume token: %r" % (change_stream.resume_token,)) if change is not None: print("Change document: %r" % (change,)) continue # We end up here when there are no recent changes. # Sleep for a while before trying again to avoid flooding # the server with getMore requests when no changes are # available. await asyncio.sleep(10)
If no change document is cached locally then this method runs a single getMore command. If the getMore yields any documents, the next document is returned, otherwise, if the getMore returns no documents (because there have been no changes) then
Noneis returned.- Returns:
The next change document or
Nonewhen no document is available after running a single getMore or when the cursor is closed.
Added in version 2.1.
- property alive¶
Does this cursor have the potential to return more data?
Note
Even if
aliveisTrue,next()can raiseStopAsyncIterationandtry_next()can returnNone.
- property resume_token¶
The cached resume token that will be used to resume after the most recently returned change.
Added in version 3.9.