Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/vaultwarden/clients/bitwarden.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ def _refresh_connect_token(self):
)
self._connect_token = ConnectToken.model_validate_json(resp.text)

import vaultwarden.models.bitwarden
self._connect_token.master_key = make_master_key(
password=self.password,
salt=self.email,
iterations=self._connect_token.KdfIterations,
kdf=vaultwarden.models.bitwarden.Kdf.from_ConnectToken(self._connect_token),
)

def _set_connect_token(self):
Expand All @@ -91,10 +92,11 @@ def _set_connect_token(self):
"identity/connect/token", headers=headers, data=payload
)
self._connect_token = ConnectToken.model_validate_json(resp.text)
import vaultwarden.models.bitwarden
self._connect_token.master_key = make_master_key(
password=self.password,
salt=self.email,
iterations=self._connect_token.KdfIterations,
kdf=vaultwarden.models.bitwarden.Kdf.from_ConnectToken(self._connect_token),
)

# login to api
Expand Down
228 changes: 213 additions & 15 deletions src/vaultwarden/models/bitwarden.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Generic, Literal, TypeVar, cast
from typing import Generic, Literal, TypeVar, cast, Any, Self, Annotated, Union
from uuid import UUID
import datetime

from pydantic import AliasChoices, Field, TypeAdapter, field_validator
from pydantic_core.core_schema import FieldValidationInfo
from pydantic import AliasChoices, Field, TypeAdapter, field_validator, RootModel, PrivateAttr, model_validator, ValidationError, ModelWrapValidatorHandler, WrapValidator, AfterValidator
from pydantic_core.core_schema import FieldValidationInfo, ValidationInfo, ValidatorFunctionWrapHandler

from vaultwarden.clients.bitwarden import BitwardenAPIClient
from vaultwarden.models.enum import CipherType, OrganizationUserType
from vaultwarden.models.exception_models import BitwardenError
from vaultwarden.models.permissive_model import PermissiveBaseModel
from vaultwarden.utils.crypto import decrypt, encrypt

from src.vaultwarden.models.enum import KdfType

# Pydantic models for Bitwarden data structures

T = TypeVar("T", bound="BitwardenBaseModel")
Expand All @@ -36,13 +39,158 @@ def api_client(self) -> BitwardenAPIClient:
assert self.bitwarden_client is not None
return self.bitwarden_client

def decodeBytes(value: Any, handler: ValidatorFunctionWrapHandler, info:ValidationInfo) -> str:
for key in info.context["cctx"][::-1]:
try:
return decrypt(handler(value), key)
except Exception as e:
continue
raise e


def decodeString(value: Any, handler: ValidatorFunctionWrapHandler, info:ValidationInfo) -> str:
return decodeBytes(value, handler, info=info).decode("utf-8")


class UriMatch(BitwardenBaseModel):
class Config:
extra = "forbid"

match: int|None = None
uri: Annotated[str, WrapValidator(decodeString)]|None = None
uriChecksum: Annotated[str, WrapValidator(decodeString)]|None = None
response: str|None = None

class XField(BitwardenBaseModel):
class Config:
extra = "forbid"

name: Annotated[str, WrapValidator(decodeString)] | None = None
response: Annotated[str, WrapValidator(decodeString)]|None = None
type: int
value: Annotated[str, WrapValidator(decodeString)] | None = None
linkedId: str | None = None


class CipherLogin(BitwardenBaseModel):
class Config:
extra = "forbid"

name: Annotated[str, WrapValidator(decodeString)]|None = None
autofillOnPageLoad: bool|None = None
password: Annotated[str, WrapValidator(decodeString)]|None = None
passwordRevisionDate: datetime.datetime|None = None
totp: str|None = None
uri : Annotated[str, WrapValidator(decodeString)]|None = None
uris: list[UriMatch] | None = None
username: Annotated[str, WrapValidator(decodeString)]|None = None
notes: Annotated[str, WrapValidator(decodeString)] | None = None

class PasswordChange(BitwardenBaseModel):
class Config:
extra = "forbid"

lastUsedDate: datetime.datetime
password: str

class fido2Credential(BitwardenBaseModel):
class Config:
extra = "forbid"

counter: Annotated[str, WrapValidator(decodeString)] | None = None
creationDate: datetime.datetime|None = None
credentialId: Annotated[str, WrapValidator(decodeString)] | None = None
discoverable: Annotated[str, WrapValidator(decodeString)] | None = None
keyAlgorithm: Annotated[str, WrapValidator(decodeString)] | None = None
keyCurve: Annotated[str, WrapValidator(decodeString)] | None = None
keyType: Annotated[str, WrapValidator(decodeString)] | None = None
keyValue: Annotated[str, WrapValidator(decodeString)] | None = None
response: str|None = None
rpId: Annotated[str, WrapValidator(decodeString)] | None = None
rpName: Annotated[str, WrapValidator(decodeString)] | None = None
userDisplayName: Annotated[str, WrapValidator(decodeString)] | None = None
userHandle: Annotated[str, WrapValidator(decodeString)] | None = None
userName: Annotated[str, WrapValidator(decodeString)] | None = None


class LoginData(CipherLogin):
class Config:
extra = "forbid"

fields: list[XField] | None = None
passwordHistory: list[PasswordChange] | None = None
response: str|None = None
fido2Credentials: list[fido2Credential] | None = None


class SecureNoteData(CipherLogin):
class Config:
extra = "forbid"

fields: list[XField]
passwordHistory: list[PasswordChange]
response: str | None = None
type: int|None = None

class SecureNoteProperty(BitwardenBaseModel):
class Config:
extra = "forbid"

name: Annotated[str, WrapValidator(decodeString)] | None = None
notes: Annotated[str, WrapValidator(decodeString)] | None = None
fields: list[XField]|None = None
passwordHistory: list[PasswordChange]|None = None
response: Annotated[str, WrapValidator(decodeString)]|None = None
type: int

class Attachment(BitwardenBaseModel):
class Config:
extra = "forbid"
fileName: Annotated[str, WrapValidator(decodeString)]|None = None
id: str
key: str|None = None #Annotated[str, WrapValidator(decodeBytes)]|None = None
object: str
size: int
sizeName: str
url: str

class _CipherBase(BitwardenBaseModel):
class Config:
extra = "forbid"

class CipherDetails(BitwardenBaseModel):
Id: UUID | None = None
OrganizationId: UUID | None = Field(None, validate_default=True)
Type: CipherType
Name: str
Name: Annotated[str, WrapValidator(decodeString)]
CollectionIds: list[UUID]
key: str|None = None

organizationUseTotp: bool|None = None
creationDate: datetime.datetime|None = None
deletedDate: datetime.datetime|None = None
fields: list[XField] | None = None

notes: Annotated[str, WrapValidator(decodeString)] | None = None
reprompt: int
revisionDate: str
sshKey: str|None
passwordHistory: list[PasswordChange]
object: str|None = None
attachments: list[Attachment]|None=None


@model_validator(mode='wrap')
@classmethod
def set_key(cls, data: Any, handler: ModelWrapValidatorHandler[Self], info:ValidationInfo) -> Self:
if data.get("key") is not None:
info.context["cctx"].append(decrypt(data["key"],info.context["cctx"][0]))

v = handler(data)

if data.get("key") is not None:
info.context["cctx"].pop()

return v

@field_validator("OrganizationId")
@classmethod
Expand Down Expand Up @@ -87,6 +235,48 @@ def update_collection(self, collections: list[UUID]):
json={"collectionIds": dump},
)

class Login(_CipherBase):
Type: Literal[1]

login: LoginData|None = None
secureNote: None = None
card: None = None
identity: None = None

data: LoginData|None = None

class SecureNote(_CipherBase):
Type: Literal[2]

login: None = None
secureNote: SecureNoteProperty|None = None
card: None = None
identity: None = None

data: SecureNoteData|None = None

class Card(_CipherBase):
Type: Literal[3]

login: None = None
card: None = None
secureNote: None = None
identity: None = None

data: None = None

class Identity(_CipherBase):
Type: Literal[4]

login: None = None
secureNote: None = None
card: None = None
identity: None = None

data: None = None

CipherDetails = Annotated[Union[Login, SecureNote, Card, Identity], Field(discriminator='Type')]


class CollectionAccess(BitwardenBaseModel):
ReadOnly: bool = False
Expand Down Expand Up @@ -541,14 +731,11 @@ def _get_ciphers(self) -> list[CipherDetails]:
"api/ciphers/organization-details",
params={"organizationId": self.Id},
)
org_key = self.key()
res = ResplistBitwarden[CipherDetails].model_validate_json(
resp.text,
context={"parent_id": self.Id, "client": self.api_client},
context={"parent_id": self.Id, "client": self.api_client, "cctx":[org_key]},
)
org_key = self.key()
# map each cipher name to the decrypted name
for cipher in res.Data:
cipher.Name = decrypt(cipher.Name, org_key).decode("utf-8")
return res.Data

def ciphers(
Expand All @@ -572,14 +759,13 @@ def ciphers(

def key(self):
sync = self.api_client.sync()
raw_key = None
for org in sync.Profile.Organizations:
if org.Id == self.Id:
raw_key = org.Key
break
if raw_key is not None:
return decrypt(raw_key, self.api_client.connect_token.orgs_key)
raise BitwardenError(f"No Organizations `{self.Id}` found")
else:
raise BitwardenError(f"No Organizations `{self.Id}` found")
return decrypt(org.Key, self.api_client.connect_token.orgs_key)



def get_organization(
Expand All @@ -592,3 +778,15 @@ def get_organization(
resp.text,
context={"client": bitwarden_client, "parent_id": organisation_id},
)

import dataclasses
@dataclasses.dataclass
class Kdf:
Kdf: KdfType
KdfIterations: int | None = None
KdfMemory: int | None = None
KdfParallelism: int | None = None

@classmethod
def from_ConnectToken(cls, token: "vaultwarden.clients.bitwarden.ConnectToken"):
return cls(token.Kdf, token.KdfIterations, token.KdfMemory, token.KdfParallelism)
4 changes: 4 additions & 0 deletions src/vaultwarden/models/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ class VaultwardenUserStatus(IntEnum):
Enabled = 0
Invited = 1
Disabled = 2

class KdfType(IntEnum):
Pbkdf2 = 0
Argon2id = 1
8 changes: 6 additions & 2 deletions src/vaultwarden/models/sync.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import time
from uuid import UUID

import pydantic
from pydantic import AliasChoices, Field, field_validator

from vaultwarden.models.enum import VaultwardenUserStatus
from vaultwarden.models.permissive_model import PermissiveBaseModel
from vaultwarden.utils.crypto import decrypt

from src.vaultwarden.models.enum import KdfType


class ConnectToken(PermissiveBaseModel):
Kdf: int = 0
Kdf: KdfType = KdfType.Pbkdf2
KdfIterations: int = 0
KdfMemory: int | None = None
KdfParallelism: int | None = None
Expand All @@ -22,7 +25,8 @@ class ConnectToken(PermissiveBaseModel):
scope: str
unofficialServer: bool = False
ResetMasterPassword: bool | None = None
master_key: bytes | None = None

master_key: bytes | None = None #pydantic.PrivateAttr(default=None)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose using PrivateAttr here


@field_validator("expires_in")
@classmethod
Expand Down
25 changes: 17 additions & 8 deletions src/vaultwarden/utils/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from Crypto.PublicKey import RSA
from hkdf import hkdf_expand


class CIPHERS(IntEnum):
sym = 2
asym = 4
Expand Down Expand Up @@ -69,6 +68,7 @@ def decode_cipher_string(cipher_string):
"""decode a cipher tring into it's parts"""
iv = None
mac = None
assert cipher_string is not None
if not ENCRYPTED_STRING_RE.match(cipher_string):
raise WrongFormatError(f"{cipher_string}")
try:
Expand Down Expand Up @@ -114,14 +114,23 @@ def is_encrypted(cipher_string):
return True


def make_master_key(password, salt, iterations=ITERATIONS):
salt = salt.lower()
if not hasattr(password, "decode"):
password = password.encode("utf-8")
if not hasattr(salt, "decode"):
salt = salt.encode("utf-8")
return pbkdf2_hmac("sha256", password, salt, iterations)
def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden.Kdf"):
import vaultwarden.models.bitwarden

assert isinstance(salt, str)
assert isinstance(password, str)

salt = salt.lower()
password = password.encode("utf-8")
salt = salt.encode("utf-8")

match kdf.Kdf:
case vaultwarden.models.bitwarden.KdfType.Pbkdf2:
return pbkdf2_hmac("sha256", password, salt, kdf.KdfIterations)
case vaultwarden.models.bitwarden.KdfType.Argon2:
raise NotImplementedError("x")
case _:
return None

def hash_password(password, salt, iterations=ITERATIONS):
"""base64-encode a wrapped, stretched password+salt(email) for signup/login"""
Expand Down