diff --git a/src/vaultwarden/clients/bitwarden.py b/src/vaultwarden/clients/bitwarden.py index 8c8858e..ca39615 100644 --- a/src/vaultwarden/clients/bitwarden.py +++ b/src/vaultwarden/clients/bitwarden.py @@ -67,10 +67,11 @@ def _refresh_connect_token(self): ) self._connect_token = ConnectToken.model_validate_json(resp.text) + import vaultwarden.models.bitwarden self._connect_token.master_key = make_master_key( password=self.password, salt=self.email, - iterations=self._connect_token.KdfIterations, + kdf=vaultwarden.models.bitwarden.Kdf.from_ConnectToken(self._connect_token), ) def _set_connect_token(self): @@ -91,10 +92,11 @@ def _set_connect_token(self): "identity/connect/token", headers=headers, data=payload ) self._connect_token = ConnectToken.model_validate_json(resp.text) + import vaultwarden.models.bitwarden self._connect_token.master_key = make_master_key( password=self.password, salt=self.email, - iterations=self._connect_token.KdfIterations, + kdf=vaultwarden.models.bitwarden.Kdf.from_ConnectToken(self._connect_token), ) # login to api diff --git a/src/vaultwarden/models/bitwarden.py b/src/vaultwarden/models/bitwarden.py index 50baea6..3bb57f7 100644 --- a/src/vaultwarden/models/bitwarden.py +++ b/src/vaultwarden/models/bitwarden.py @@ -1,8 +1,9 @@ -from typing import Generic, Literal, TypeVar, cast +from typing import Generic, Literal, TypeVar, cast, Any, Self, Annotated, Union from uuid import UUID +import datetime -from pydantic import AliasChoices, Field, TypeAdapter, field_validator -from pydantic_core.core_schema import FieldValidationInfo +from pydantic import AliasChoices, Field, TypeAdapter, field_validator, RootModel, PrivateAttr, model_validator, ValidationError, ModelWrapValidatorHandler, WrapValidator, AfterValidator +from pydantic_core.core_schema import FieldValidationInfo, ValidationInfo, ValidatorFunctionWrapHandler from vaultwarden.clients.bitwarden import BitwardenAPIClient from vaultwarden.models.enum import CipherType, OrganizationUserType @@ -10,6 +11,8 @@ from vaultwarden.models.permissive_model import PermissiveBaseModel from vaultwarden.utils.crypto import decrypt, encrypt +from src.vaultwarden.models.enum import KdfType + # Pydantic models for Bitwarden data structures T = TypeVar("T", bound="BitwardenBaseModel") @@ -36,13 +39,158 @@ def api_client(self) -> BitwardenAPIClient: assert self.bitwarden_client is not None return self.bitwarden_client +def decodeBytes(value: Any, handler: ValidatorFunctionWrapHandler, info:ValidationInfo) -> str: + for key in info.context["cctx"][::-1]: + try: + return decrypt(handler(value), key) + except Exception as e: + continue + raise e + + +def decodeString(value: Any, handler: ValidatorFunctionWrapHandler, info:ValidationInfo) -> str: + return decodeBytes(value, handler, info=info).decode("utf-8") + + +class UriMatch(BitwardenBaseModel): + class Config: + extra = "forbid" + + match: int|None = None + uri: Annotated[str, WrapValidator(decodeString)]|None = None + uriChecksum: Annotated[str, WrapValidator(decodeString)]|None = None + response: str|None = None + +class XField(BitwardenBaseModel): + class Config: + extra = "forbid" + + name: Annotated[str, WrapValidator(decodeString)] | None = None + response: Annotated[str, WrapValidator(decodeString)]|None = None + type: int + value: Annotated[str, WrapValidator(decodeString)] | None = None + linkedId: str | None = None + + +class CipherLogin(BitwardenBaseModel): + class Config: + extra = "forbid" + + name: Annotated[str, WrapValidator(decodeString)]|None = None + autofillOnPageLoad: bool|None = None + password: Annotated[str, WrapValidator(decodeString)]|None = None + passwordRevisionDate: datetime.datetime|None = None + totp: str|None = None + uri : Annotated[str, WrapValidator(decodeString)]|None = None + uris: list[UriMatch] | None = None + username: Annotated[str, WrapValidator(decodeString)]|None = None + notes: Annotated[str, WrapValidator(decodeString)] | None = None + +class PasswordChange(BitwardenBaseModel): + class Config: + extra = "forbid" + + lastUsedDate: datetime.datetime + password: str + +class fido2Credential(BitwardenBaseModel): + class Config: + extra = "forbid" + + counter: Annotated[str, WrapValidator(decodeString)] | None = None + creationDate: datetime.datetime|None = None + credentialId: Annotated[str, WrapValidator(decodeString)] | None = None + discoverable: Annotated[str, WrapValidator(decodeString)] | None = None + keyAlgorithm: Annotated[str, WrapValidator(decodeString)] | None = None + keyCurve: Annotated[str, WrapValidator(decodeString)] | None = None + keyType: Annotated[str, WrapValidator(decodeString)] | None = None + keyValue: Annotated[str, WrapValidator(decodeString)] | None = None + response: str|None = None + rpId: Annotated[str, WrapValidator(decodeString)] | None = None + rpName: Annotated[str, WrapValidator(decodeString)] | None = None + userDisplayName: Annotated[str, WrapValidator(decodeString)] | None = None + userHandle: Annotated[str, WrapValidator(decodeString)] | None = None + userName: Annotated[str, WrapValidator(decodeString)] | None = None + + +class LoginData(CipherLogin): + class Config: + extra = "forbid" + + fields: list[XField] | None = None + passwordHistory: list[PasswordChange] | None = None + response: str|None = None + fido2Credentials: list[fido2Credential] | None = None + + +class SecureNoteData(CipherLogin): + class Config: + extra = "forbid" + + fields: list[XField] + passwordHistory: list[PasswordChange] + response: str | None = None + type: int|None = None + +class SecureNoteProperty(BitwardenBaseModel): + class Config: + extra = "forbid" + + name: Annotated[str, WrapValidator(decodeString)] | None = None + notes: Annotated[str, WrapValidator(decodeString)] | None = None + fields: list[XField]|None = None + passwordHistory: list[PasswordChange]|None = None + response: Annotated[str, WrapValidator(decodeString)]|None = None + type: int + +class Attachment(BitwardenBaseModel): + class Config: + extra = "forbid" + fileName: Annotated[str, WrapValidator(decodeString)]|None = None + id: str + key: str|None = None #Annotated[str, WrapValidator(decodeBytes)]|None = None + object: str + size: int + sizeName: str + url: str + +class _CipherBase(BitwardenBaseModel): + class Config: + extra = "forbid" -class CipherDetails(BitwardenBaseModel): Id: UUID | None = None OrganizationId: UUID | None = Field(None, validate_default=True) Type: CipherType - Name: str + Name: Annotated[str, WrapValidator(decodeString)] CollectionIds: list[UUID] + key: str|None = None + + organizationUseTotp: bool|None = None + creationDate: datetime.datetime|None = None + deletedDate: datetime.datetime|None = None + fields: list[XField] | None = None + + notes: Annotated[str, WrapValidator(decodeString)] | None = None + reprompt: int + revisionDate: str + sshKey: str|None + passwordHistory: list[PasswordChange] + object: str|None = None + attachments: list[Attachment]|None=None + + + @model_validator(mode='wrap') + @classmethod + def set_key(cls, data: Any, handler: ModelWrapValidatorHandler[Self], info:ValidationInfo) -> Self: + if data.get("key") is not None: + info.context["cctx"].append(decrypt(data["key"],info.context["cctx"][0])) + + v = handler(data) + + if data.get("key") is not None: + info.context["cctx"].pop() + + return v @field_validator("OrganizationId") @classmethod @@ -87,6 +235,48 @@ def update_collection(self, collections: list[UUID]): json={"collectionIds": dump}, ) +class Login(_CipherBase): + Type: Literal[1] + + login: LoginData|None = None + secureNote: None = None + card: None = None + identity: None = None + + data: LoginData|None = None + +class SecureNote(_CipherBase): + Type: Literal[2] + + login: None = None + secureNote: SecureNoteProperty|None = None + card: None = None + identity: None = None + + data: SecureNoteData|None = None + +class Card(_CipherBase): + Type: Literal[3] + + login: None = None + card: None = None + secureNote: None = None + identity: None = None + + data: None = None + +class Identity(_CipherBase): + Type: Literal[4] + + login: None = None + secureNote: None = None + card: None = None + identity: None = None + + data: None = None + +CipherDetails = Annotated[Union[Login, SecureNote, Card, Identity], Field(discriminator='Type')] + class CollectionAccess(BitwardenBaseModel): ReadOnly: bool = False @@ -541,14 +731,11 @@ def _get_ciphers(self) -> list[CipherDetails]: "api/ciphers/organization-details", params={"organizationId": self.Id}, ) + org_key = self.key() res = ResplistBitwarden[CipherDetails].model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context={"parent_id": self.Id, "client": self.api_client, "cctx":[org_key]}, ) - org_key = self.key() - # map each cipher name to the decrypted name - for cipher in res.Data: - cipher.Name = decrypt(cipher.Name, org_key).decode("utf-8") return res.Data def ciphers( @@ -572,14 +759,13 @@ def ciphers( def key(self): sync = self.api_client.sync() - raw_key = None for org in sync.Profile.Organizations: if org.Id == self.Id: - raw_key = org.Key break - if raw_key is not None: - return decrypt(raw_key, self.api_client.connect_token.orgs_key) - raise BitwardenError(f"No Organizations `{self.Id}` found") + else: + raise BitwardenError(f"No Organizations `{self.Id}` found") + return decrypt(org.Key, self.api_client.connect_token.orgs_key) + def get_organization( @@ -592,3 +778,15 @@ def get_organization( resp.text, context={"client": bitwarden_client, "parent_id": organisation_id}, ) + +import dataclasses +@dataclasses.dataclass +class Kdf: + Kdf: KdfType + KdfIterations: int | None = None + KdfMemory: int | None = None + KdfParallelism: int | None = None + + @classmethod + def from_ConnectToken(cls, token: "vaultwarden.clients.bitwarden.ConnectToken"): + return cls(token.Kdf, token.KdfIterations, token.KdfMemory, token.KdfParallelism) \ No newline at end of file diff --git a/src/vaultwarden/models/enum.py b/src/vaultwarden/models/enum.py index 2b0b57c..74b6f6e 100644 --- a/src/vaultwarden/models/enum.py +++ b/src/vaultwarden/models/enum.py @@ -27,3 +27,7 @@ class VaultwardenUserStatus(IntEnum): Enabled = 0 Invited = 1 Disabled = 2 + +class KdfType(IntEnum): + Pbkdf2 = 0 + Argon2id = 1 \ No newline at end of file diff --git a/src/vaultwarden/models/sync.py b/src/vaultwarden/models/sync.py index 9171620..90d5443 100644 --- a/src/vaultwarden/models/sync.py +++ b/src/vaultwarden/models/sync.py @@ -1,15 +1,18 @@ import time from uuid import UUID +import pydantic from pydantic import AliasChoices, Field, field_validator from vaultwarden.models.enum import VaultwardenUserStatus from vaultwarden.models.permissive_model import PermissiveBaseModel from vaultwarden.utils.crypto import decrypt +from src.vaultwarden.models.enum import KdfType + class ConnectToken(PermissiveBaseModel): - Kdf: int = 0 + Kdf: KdfType = KdfType.Pbkdf2 KdfIterations: int = 0 KdfMemory: int | None = None KdfParallelism: int | None = None @@ -22,7 +25,8 @@ class ConnectToken(PermissiveBaseModel): scope: str unofficialServer: bool = False ResetMasterPassword: bool | None = None - master_key: bytes | None = None + + master_key: bytes | None = None #pydantic.PrivateAttr(default=None) @field_validator("expires_in") @classmethod diff --git a/src/vaultwarden/utils/crypto.py b/src/vaultwarden/utils/crypto.py index 641e58e..947e040 100644 --- a/src/vaultwarden/utils/crypto.py +++ b/src/vaultwarden/utils/crypto.py @@ -19,7 +19,6 @@ from Crypto.PublicKey import RSA from hkdf import hkdf_expand - class CIPHERS(IntEnum): sym = 2 asym = 4 @@ -69,6 +68,7 @@ def decode_cipher_string(cipher_string): """decode a cipher tring into it's parts""" iv = None mac = None + assert cipher_string is not None if not ENCRYPTED_STRING_RE.match(cipher_string): raise WrongFormatError(f"{cipher_string}") try: @@ -114,14 +114,23 @@ def is_encrypted(cipher_string): return True -def make_master_key(password, salt, iterations=ITERATIONS): - salt = salt.lower() - if not hasattr(password, "decode"): - password = password.encode("utf-8") - if not hasattr(salt, "decode"): - salt = salt.encode("utf-8") - return pbkdf2_hmac("sha256", password, salt, iterations) +def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden.Kdf"): + import vaultwarden.models.bitwarden + + assert isinstance(salt, str) + assert isinstance(password, str) + salt = salt.lower() + password = password.encode("utf-8") + salt = salt.encode("utf-8") + + match kdf.Kdf: + case vaultwarden.models.bitwarden.KdfType.Pbkdf2: + return pbkdf2_hmac("sha256", password, salt, kdf.KdfIterations) + case vaultwarden.models.bitwarden.KdfType.Argon2: + raise NotImplementedError("x") + case _: + return None def hash_password(password, salt, iterations=ITERATIONS): """base64-encode a wrapped, stretched password+salt(email) for signup/login"""