Source code for asyncmongo.auth

from .utils.saslprep import saslprep
import hashlib
import hmac
import os
import typing
from base64 import standard_b64encode, standard_b64decode
from collections import namedtuple

from asyncmongo.exceptions import AuthenticationFailedError

from .utils import _xor


MongoCredential = namedtuple(
    "MongoCredential",
    ["mechanism", "source", "username", "password", "mechanism_properties", "cache"],
)
"""A hashable namedtuple of values used for authentication."""


[docs] async def try_authenticate(connection, credentials: MongoCredential): username = credentials.username user = username.encode("utf-8").replace(b"=", b"=3D").replace(b",", b"=2C") nonce = standard_b64encode(os.urandom(32)) first_bare = b"n=" + user + b",r=" + nonce payload = b"n,," + bytes(first_bare) cmd = { "saslStart": 1, "mechanism": credentials.mechanism, "payload": payload, "autoAuthorize": 1, "options": {"skipEmptyExchange": True}, } try: resp = await connection.command(command=cmd, database_name=credentials.source) except Exception as e: raise AuthenticationFailedError(e) print(resp) server_first = resp["payload"] # make as a new function parse parsed = dict( typing.cast(typing.Tuple[bytes, bytes], item.split(b"=", 1)) for item in resp.get("payload").split(b",") ) iterations = int(parsed[b"i"]) if iterations < 4096: raise Exception("Server returned an invalid iteration count.") salt = parsed[b"s"] rnonce = parsed[b"r"] without_proof = b"c=biws,r=" + rnonce client_key, csalt, citerations = None, None, None data = saslprep(credentials.password).encode("utf-8") _hmac = hmac.HMAC digestmod = hashlib.sha256 # Salt and / or iterations could change for a number of different # reasons. Either changing invalidates the cache. if not client_key or salt != csalt or iterations != citerations: salted_pass = hashlib.pbkdf2_hmac( "sha256", data, standard_b64decode(salt), iterations ) client_key = _hmac(salted_pass, b"Client Key", digestmod).digest() # server_key = _hmac(salted_pass, b"Server Key", digestmod).digest() # cache.data = (client_key, server_key, salt, iterations) stored_key = digestmod(client_key).digest() auth_msg = b",".join((first_bare, server_first, without_proof)) client_sig = _hmac(stored_key, auth_msg, digestmod).digest() client_proof = b"p=" + standard_b64encode(_xor(client_key, client_sig)) client_final = b",".join((without_proof, client_proof)) # server_sig = standard_b64encode(_hmac(server_key, auth_msg, digestmod).digest()) cmd = { "saslContinue": 1, "conversationId": resp["conversationId"], "payload": client_final, } resp = await connection.command(command=cmd, database_name="admin") if not resp.get("done"): raise Exception("Could not authenticate with server")