[docs]classAsyncMongoConnection:""" Manages an asynchronous connection to a MongoDB server, enabling communication and authentication for database operations. """def__init__(self):""" Initializes an AsyncMongoConnection instance with uninitialized reader, writer, and options. """self.reader:StreamReader|None=Noneself.writer:StreamWriter|None=Noneself.options:ClientOptions|None=None
[docs]@classmethodasyncdefcreate(cls,host:str|None="localhost",port:int|None=27017,options:ClientOptions|None=None,)->"AsyncMongoConnection":""" Establishes an asynchronous connection to a MongoDB server. Args: host (str | None): Hostname or IP address of the MongoDB server. Defaults to "localhost". port (int | None): Port number of the MongoDB server. Defaults to 27017. options (ClientOptions | None): Optional client options for authentication. Returns: AsyncMongoConnection: An instance of the connection. """self=cls()self.options=optionsself.reader,self.writer=awaitasyncio.open_connection(host,port)ifoptionsandoptions.usernameandoptions.password:awaitself._authenticate()print(f"Connected to MongoDB at {host} port {port}")returnself
asyncdef_authenticate(self):""" Authenticates the connection using the provided credentials. Raises: Exception: If authentication fails. """creds=MongoCredential(mechanism="SCRAM-SHA-256",source=self.options.database,username=self.options.username,password=self.options.password,mechanism_properties="",cache="",)awaittry_authenticate(self,credentials=creds)
[docs]asyncdefsend(self,payload:bytes)->list|None:""" Sends a payload to the MongoDB server and reads the response. Args: payload (bytes): The serialized payload to send. Returns: list | None: The parsed response documents, or None if no data is received. """self.writer.write(payload)awaitself.writer.drain()returnawaitself.read()
[docs]asyncdefread(self)->list|None:""" Reads a response from the MongoDB server. Returns: list | None: A list of BSON documents parsed from the server response, or None if no data is received. """op_msg_size=ctypes.sizeof(OP_MSG)header_data=awaitself.reader.read(op_msg_size)ifnotheader_data:print("No data received for header")returnNoneop_msg=OP_MSG.from_buffer_copy(header_data)# Calculate the size of the documents fielddocuments_size=op_msg.messageLength-op_msg_sizeifdocuments_size<=0:print("No documents present or invalid message length")returnNone# Read the documents datadocuments_data=awaitself.reader.read(documents_size)ifnotdocuments_data:print("No data received for documents")returnNone# Parse the BSON documentsdocuments=bson.loads(documents_data)ifnotdocuments:returnNonereturndocuments
[docs]asyncdefcommand(self,database_name:str,command:dict)->list|None:""" Sends a command to the MongoDB server. Args: database_name (str): The name of the database for the command. command (dict): The command to execute. Returns: list | None: The response from the server, parsed as a list of BSON documents. """command.update({"$db":database_name})payload=OP_MSG.new(command)returnawaitself.send(payload)