[docs]classCursor:""" Represents a cursor for iterating over query results from a MongoDB collection. Supports asynchronous iteration and query options such as skip and limit. """def__init__(self,collection,skip:int=0,limit:int=0,batch_size:int=0,filter:dict|None=None,):""" Initializes a Cursor instance. Args: collection: The collection to query. skip (int): Number of documents to skip. Defaults to 0. limit (int): Maximum number of documents to retrieve. Defaults to 0 (no limit). batch_size (int): Number of documents to retrieve per batch (currently unused). Defaults to 0. filter (dict | None): Query filter to apply. Defaults to None. """self._collection=collectionself._skip=skipself._limit=limitself._batch_size=batch_sizeself._filter=filterself._id:str|None=None# id of the cursorself._data:deque=deque()self._connection=Noneself._killed:bool=Falsedef__aiter__(self)->"Cursor":""" Returns the cursor instance for asynchronous iteration. Returns: Cursor: The current cursor instance. """returnselfdef_create_command(self):""" Creates the MongoDB command for fetching query results. Returns: dict: The command to execute. """ifself._idandself._id!=0:return{"getMore":self._id,"collection":self._collection._name}cmd={"find":self._collection._name}ifself._filter:cmd.update({"filter":self._filter})ifself._skip:cmd.update({"skip":self._skip})ifself._limit:cmd.update({"limit":self._limit})returncmdasyncdef_refresh(self):""" Fetches the next batch of documents from the server. """ifself._killed:returnFalsecmd=self._create_command()ifnotself._connection:self._connection=self._collection._database._get_connection()res=awaitself._connection.command(command=cmd,database_name=self._collection._database.name)self._id=res["cursor"]["id"]ifself._id==0:self._killed=Trueresults=list(res["cursor"].values())self._data.extend(results[0])returnlen(self._data)asyncdef__anext__(self):""" Asynchronous iterator method to fetch the next document. Returns: dict: The next document in the result set. Raises: StopAsyncIteration: If no more documents are available. """iflen(self._data):returnself._data.popleft()is_refreshed=awaitself._refresh()ifnotis_refreshed:raiseStopAsyncIterationreturnself._data.popleft()def_check_okay_to_chain(self):""" Checks if chaining methods like `skip` or `limit` is allowed. Raises: Exception: If the cursor has already been executed. """ifself._idisnotNone:returnException("Cannot set options after executing query")
[docs]defskip(self,skip:int)->"Cursor":""" Sets the number of documents to skip. Args: skip (int): Number of documents to skip. Returns: Cursor: The current cursor instance. Raises: TypeError: If `skip` is not an integer. ValueError: If `skip` is negative. """ifnotisinstance(skip,int):raiseTypeError("Skip must be an integer")ifskip<0:raiseValueError("Skip must be >= 0")self._check_okay_to_chain()self._skip=skipreturnself
[docs]deflimit(self,limit)->"Cursor":""" Sets the maximum number of documents to retrieve. Args: limit (int): Maximum number of documents to retrieve. Returns: Cursor: The current cursor instance. Raises: TypeError: If `limit` is not an integer. """ifnotisinstance(limit,int):raiseTypeError("limit must be an integer")self._check_okay_to_chain()self._limit=limitreturnself