From 01305df053031a9da2b364db69073d4824682139 Mon Sep 17 00:00:00 2001 From: classabbyamp Date: Tue, 30 Jul 2024 20:46:31 -0400 Subject: [PATCH] wip --- netauth/__init__.py | 902 +++++++++++++++++++++++++++++++++++++++++ netauth/_kv.py | 58 +++ netauth/_types.py | 171 ++++++++ netauth/v2/__init__.py | 90 ++++ test.py | 27 ++ 5 files changed, 1248 insertions(+) create mode 100644 netauth/__init__.py create mode 100644 netauth/_kv.py create mode 100644 netauth/_types.py create mode 100644 netauth/v2/__init__.py create mode 100644 test.py diff --git a/netauth/__init__.py b/netauth/__init__.py new file mode 100644 index 0000000..2763fef --- /dev/null +++ b/netauth/__init__.py @@ -0,0 +1,902 @@ +import socket +from pathlib import Path + +import grpc +import tomllib + +from . import _pb, v2 +from ._kv import ( + KVDict, + from_KVData, + parse_KVDict, + to_KVValue, +) +from ._types import ( + Capability, + Entity, + EntityMeta, + ExpansionMode, + Group, +) + +__all__ = [ + "Capability", + "Entity", + "EntityMeta", + "ExpansionMode", + "Group", + "KVDict", + "NetAuth2", + "v2", +] + +__version__ = "0.0.1" + + +class NetAuth2: + def __init__(self, server: str, port: int = 1729, cert: str | Path | bytes | None = None): + """TODO""" + self.server = server + self.port = port + + # seems very unlikely to error, but possible + # CPython is inscrutable about what kind of error it could be + try: + self.client_name = socket.gethostname() + except: # noqa: E722 + self.client_name = "BOGUS_CLIENT" + self.service_name = "netauth" + + self.credentials = None + + if isinstance(cert, str): + cert = Path(cert) + if isinstance(cert, Path): + with cert.open("rb") as cf: + cert = cf.read() + if cert is not None: + self.credentials = grpc.ssl_channel_credentials(root_certificates=cert) + + target = f"{self.server}:{self.port}" + + if self.credentials is not None: + self.channel = grpc.secure_channel(target, self.credentials) + else: + self.channel = grpc.insecure_channel(target) + + self.stub = _pb.v2.NetAuth2Stub(self.channel) + + @classmethod + def with_config(cls, path: str | Path): + """TODO""" + # TODO: other config values? + if isinstance(path, str): + path = Path(path) + with path.open("rb") as f: + cfg = tomllib.load(f) + + server = cfg["core"]["server"] + port = cfg["core"].get("port", 1729) + cert = None + if "tls" in cfg: + cert = cfg["tls"].get("certificate") + return NetAuth2(server, port, cert) + + def close(self): + """TODO""" + self.channel.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + + def __del__(self): + try: + self.close() + except: # noqa: E722 + pass + + def __metadata(self, auth: bool = False) -> list[tuple[str, str]]: + """ + Generate metadata for an RPC request. + + :param auth: whether to include the authorization header (default: ``False``) + :type auth: bool, optional + :return: the metadata for the request + :rtype: list[tuple[str, str]] + """ + meta = [ + ("client-name", self.client_name), + ("service-name", self.service_name), + ] + if auth: + # TODO + meta.append(("authorization", "")) + return meta + + def entity_create(self, id: str, secret: str, number: int = -1): + """ + Create a new entity. + + :param id: the new entity's ID. Must be unique. + :type id: str + :param secret: the new entity's secret. + :type secret: str + :param number: the new entity's number. Strongly recommended to be unique. + If not provided, the next valid number will be selected and assigned. + :type number: int, optional + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writeable + + self.stub.EntityCreate( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity( + ID=id, + secret=secret, + Number=number, + ), + ), + metadata=self.__metadata(), + ) + + def entity_update(self, id: str, meta: EntityMeta): + """ + Update the generic metadata on an existing entity. + + :param id: the entity's ID + :type id: str + :param meta: the new metadata values + :type meta: EntityMeta + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writeable + self.stub.EntityUpdate( + request=_pb.v2.EntityRequest( + Data=_pb.Entity( + ID=id, + meta=meta._into(), + ), + ), + metadata=self.__metadata(), + ) + + def entity_info(self, id: str) -> Entity | None: + """ + Returns information about an entity. Does not require authentication. + + :param id: the entity's ID + :type id: str + :raises grpc.RpcError: if the gRPC request fails + :return: the entity or ``None`` if not found + :rtype: Entity | None + """ + resp: _pb.v2.ListOfEntities = self.stub.EntityInfo( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity( + ID=id, + ), + ), + metadata=self.__metadata(), + ) + if resp.Entities is not None and len(resp.Entities) > 0: + return Entity._from(resp.Entities[0]) + + def entity_search(self, expr: str) -> list[Entity]: + """ + Searches all entities. Does not require authentication. + + :param expr: expression to search for + :type expr: str + :raises grpc.RpcError: if the gRPC request fails + :return: list of entities that match the search criteria + :rtype: list[Entity] + """ + resp: _pb.v2.ListOfEntities = self.stub.EntitySearch( + request=_pb.v2.SearchRequest(expression=expr), + metadata=self.__metadata(), + ) + if resp.Entities is not None: + return [Entity._from(e) for e in resp.Entities] + return [] + + def entity_untyped_meta(self, id: str, action: v2.Action, key: str, value: str = "") -> KVDict | None: + """ + Perform operations on the untyped key-value store on an entity. Read + operations do not require authentication. + + :param id: target entity ID + :type id: str + :param action: action to perform (``Upsert``, ``Read``, ``ClearFuzzy``, or ``ClearExact``) + :type action: v2.Action + :param key: key to read from or write to. If ``'*'``, operate on all keys. + :type key: str + :param value: value to write, if writing + :type value: str, optional + :raises grpc.RpcError: if the gRPC request fails + :return: a dictionary of the requested keys and values if reading, nothing if writing or clearing + :rtype: KVDict | None + """ + if action != v2.Action.Read: + # TODO: make writeable + ... + + resp: _pb.v2.ListOfStrings = self.stub.EntityUM( + request=_pb.v2.KVRequest( + Target=id, + Action=action._into(), + Key=key, + Value=value, + ), + metadata=self.__metadata(), + ) + + if action == v2.Action.Read: + kv = parse_KVDict(list(resp.Strings)) + if key != "*": + return {key: kv[key]} + return kv + + def entity_kv_add(self, id: str, key: str, values: list[str]): + """ + Add a key to the specified entity. The key must not already exist. + Value order will be preserved. + + :param id: target entity ID + :type id: str + :param key: the key to add + :type key: str + :param values: the values to add + :type values: list[str] + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + vals = [to_KVValue(v, i) for i, v in enumerate(values)] + + self.stub.EntityKVAdd( + request=_pb.v2.KV2Request( + Target=id, + Data=_pb.KVData( + Key=key, + Values=vals, + ), + ), + metadata=self.__metadata(), + ) + + def entity_kv_get(self, id: str, key: str) -> KVDict: + """ + Returns values for the requested key if it exists. + + :param id: target entity ID + :type id: str + :param key: the key to retrieve. If ``'*'``, get all keys. + :type key: str + :raises grpc.RpcError: if the gRPC request fails + :return: the key-value mapping requested + :rtype: KVDict + """ + resp: _pb.v2.ListOfKVData = self.stub.EntityKVGet( + request=_pb.v2.KV2Request( + Target=id, + Data=_pb.KVData(Key=key), + ), + metadata=self.__metadata(), + ) + return from_KVData(list(resp.KVData)) + + def entity_kv_del(self, id: str, key: str): + """ + Delete a key from an entity. + + :param id: target entity ID + :type id: str + :param key: the key to delete + :type key: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.EntityKVDel( + request=_pb.v2.KV2Request( + Target=id, + Data=_pb.KVData(Key=key), + ), + metadata=self.__metadata(), + ) + + def entity_kv_replace(self, id: str, key: str, values: list[str]): + """ + Replace a key for the specified entity. The key must already exist. + Value order will be preserved. + + :param id: target entity ID + :type id: str + :param key: the key to replace + :type key: str + :param values: the values to add + :type values: list[str] + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + vals = [to_KVValue(v, i) for i, v in enumerate(values)] + + self.stub.EntityKVReplace( + request=_pb.v2.KV2Request( + Target=id, + Data=_pb.KVData( + Key=key, + Values=vals, + ), + ), + metadata=self.__metadata(), + ) + + def entity_keys(self, id: str, action: v2.Action, kind: str, key: str) -> KVDict | None: + """ + Read, write, or drop public keys stored on an entity. Does not require authentication. + + :param id: target entity ID + :type id: str + :param action: action to perform (``Add``, ``Drop``, or ``Read``) + :type action: v2.Action + :param kind: type of key. If ``'*'``, operate on all key types. + :type kind: str + :param key: key value + :type key: str + :raises grpc.RpcError: if the gRPC request fails + :return: a dictionary of the requested keys and values if reading, nothing if writing or clearing + :rtype: KVDict | None + """ + if action != v2.Action.Read: + # TODO: make writable + ... + + resp: _pb.v2.ListOfStrings = self.stub.EntityKeys( + request=_pb.v2.KVRequest( + Target=id, + Action=action._into(), + Key=kind, + Value=key, + ), + metadata=self.__metadata(), + ) + + if action == v2.Action.Read: + kv = parse_KVDict(list(resp.Strings)) + if key != "*": + return {kind: kv[kind]} + return kv + + def entity_destroy(self, id: str): + """ + Permanently remove an entity from the server.This is not recommended + and should not be done without good reason. The best practice is to + instead have a group that defunct entities get moved to and then locked. + This will prevent authentication, while maintaining integrity of the + backing tree. This function does not maintain referential integrity, so + be careful about removing the last standing admin of a particular type. + + :param id: target entity ID + :type id: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.EntityDestroy( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity(ID=id), + ), + metadata=self.__metadata(), + ) + + def entity_lock(self, id: str): + """ + Sets the lock bit on the target entity. Prevents authentication from + proceeding even if correct authentication information is provided. + + :param id: target entity ID + :type id: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.EntityLock( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity(ID=id), + ), + metadata=self.__metadata(), + ) + + def entity_unlock(self, id: str): + """ + Unsets the lock bit on the target entity. + + :param id: target entity ID + :type id: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.EntityUnlock( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity(ID=id), + ), + metadata=self.__metadata(), + ) + + def entity_groups(self, id: str) -> list[Group]: + """ + Retrieves the effective group memberships of the target entity. + + :param id: target entity ID + :type id: str + :raises grpc.RpcError: if the gRPC request fails + """ + resp: _pb.v2.ListOfGroups = self.stub.EntityGroups( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity(ID=id), + ), + metadata=self.__metadata(), + ) + return [Group._from(g) for g in resp.Groups] + + def auth_entity(self, id: str, secret: str): + """ + Perform authentication for an entity. Does not acquire a token. + + :param id: target entity ID + :type id: str + :param secret: target entity's secret + :type secret: str + :raises grpc.RpcError: if the gRPC request fails + """ + self.stub.AuthEntity( + request=_pb.v2.AuthRequest( + Entity=_pb.Entity(ID=id), + Secret=secret, + ), + metadata=self.__metadata(), + ) + + def auth_get_token(self, id: str, secret: str) -> str: + """ + Perform authentication for an entity and acquire a token if successful. + This token can be used to authenticate future requests. + + :param id: target entity ID + :type id: str + :param secret: target entity's secret + :type secret: str + :raises grpc.RpcError: if the gRPC request fails + :return: token, if successful + :rtype: str + """ + resp: _pb.v2.AuthResult = self.stub.AuthGetToken( + request=_pb.v2.AuthRequest( + Entity=_pb.Entity(ID=id), + Secret=secret, + ), + metadata=self.__metadata(), + ) + return resp.Token + + def auth_validate_token(self, token: str): + """ + Perform server-side validation of a token. + + :param token: token to check + :type token: str + :raises grpc.RpcError: if the gRPC request fails + """ + self.stub.AuthValidateToken( + request=_pb.v2.AuthRequest(Token=token), + metadata=self.__metadata(), + ) + + def auth_change_secret(self, id: str, secret: str, old_secret: str | None = None): + """ + Change the secret for an entity. If the entity is changing its own + secret, the old secret should be provided. If an administrator is + changing the secret, the request must be authenticated with a token. + + :param id: target entity ID + :type id: str + :param secret: target entity's new secret + :type secret: str + :param old_secret: target entity's current secret, if needed + :type old_secret: str | None, optional + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.AuthChangeSecret( + request=_pb.v2.AuthRequest( + Entity=_pb.Entity( + ID=id, + secret=old_secret, + ), + Secret=secret, + ), + metadata=self.__metadata(), + ) + + def group_create(self, name: str, display_name: str, managed_by: str | None = None, number: int = -1): + """ + Create a new group. + + :param name: new group's name. Must be unique. + :type name: str + :param display_name: the new group's display name. + :type display_name: str + :param managed_by: the name of a group that manages the new group + :type managed_by: str | None, optional + :param number: the new group's number. Strongly recommended to be unique. + If not provided, the next valid number will be selected and assigned. + :type number: int, optional + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupCreate( + request=_pb.v2.GroupRequest( + Group=_pb.Group( + Name=name, + DisplayName=display_name, + ManagedBy=managed_by, + Number=number, + ), + ), + metadata=self.__metadata(), + ) + + def group_update(self, group: Group): + """ + Update group information. Fields that cannot be rewritten will be ignored. + + :param group: new group information + :type group: Group + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupUpdate( + request=_pb.v2.GroupRequest( + Group=group._into(), + ), + metadata=self.__metadata(), + ) + + def group_info(self, name: str) -> tuple[Group, list[Group]] | None: + """ + Returns information about a group. Does not require authentication. + + :param name: target group's name + :type name: str + :raises grpc.RpcError: if the gRPC request fails + :return: a tuple of the group and its managed groups, if found + :rtype: tuple[Group, list[Group]] | None + """ + resp: _pb.v2.ListOfGroups = self.stub.GroupInfo( + request=_pb.v2.GroupRequest( + Group=_pb.Group(Name=name), + ), + metadata=self.__metadata(), + ) + if resp.Groups is not None and len(resp.Groups) > 0: + group = Group._from(resp.Groups[0]) + try: + managed = self.group_search(f"ManagedBy:{name}") + except grpc.RpcError: + # assume error = no managed groups + managed = [] + return (group, managed) + + def group_untyped_meta(self, name: str, action: v2.Action, key: str, value: str = "") -> KVDict | None: + """ + Perform operations on the untyped key-value store on a group. Read + operations do not require authentication. + + :param name: target group's name + :type name: str + :param action: action to perform (``Upsert``, ``Read``, ``ClearFuzzy``, or ``ClearExact``) + :type action: v2.Action + :param key: key to read from or write to. If ``'*'``, operate on all keys. + :type key: str + :param value: value to write, if writing + :type value: str, optional + :raises grpc.RpcError: if the gRPC request fails + :return: a dictionary of the requested keys and values if reading, nothing if writing or clearing + :rtype: KVDict | None + """ + if action != v2.Action.Read: + # TODO: make writeable + ... + + resp: _pb.v2.ListOfStrings = self.stub.GroupUM( + request=_pb.v2.KVRequest( + Target=name, + Action=action._into(), + Key=key, + Value=value, + ), + metadata=self.__metadata(), + ) + + if action == v2.Action.Read: + kv = parse_KVDict(list(resp.Strings)) + if key != "*": + return {key: kv[key]} + return kv + + def group_kv_add(self, name: str, key: str, values: list[str]): + """ + Add a key to the specified group. The key must not already exist. + Value order will be preserved. + + :param name: target group name + :type name: str + :param key: the key to add + :type key: str + :param values: the values to add + :type values: list[str] + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + vals = [to_KVValue(v, i) for i, v in enumerate(values)] + + self.stub.GroupKVAdd( + request=_pb.v2.KV2Request( + Target=name, + Data=_pb.KVData( + Key=key, + Values=vals, + ), + ), + metadata=self.__metadata(), + ) + + def group_kv_get(self, name: str, key: str) -> KVDict: + """ + Returns values for the requested key if it exists. + + :param name: target group name + :type name: str + :param key: the key to retrieve. If ``'*'``, get all keys. + :type key: str + :raises grpc.RpcError: if the gRPC request fails + :return: the key-value mapping requested + :rtype: KVDict + """ + resp: _pb.v2.ListOfKVData = self.stub.GroupKVGet( + request=_pb.v2.KV2Request( + Target=name, + Data=_pb.KVData(Key=key), + ), + metadata=self.__metadata(), + ) + return from_KVData(list(resp.KVData)) + + def group_kv_del(self, name: str, key: str): + """ + Delete a key from a group. + + :param name: target group name + :type name: str + :param key: the key to delete + :type key: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupKVDel( + request=_pb.v2.KV2Request( + Target=name, + Data=_pb.KVData(Key=key), + ), + metadata=self.__metadata(), + ) + + def group_kv_replace(self, name: str, key: str, values: list[str]): + """ + Replace a key for the specified group. The key must already exist. + Value order will be preserved. + + :param name: target group name + :type name: str + :param key: the key to replace + :type key: str + :param values: the values to add + :type values: list[str] + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + vals = [to_KVValue(v, i) for i, v in enumerate(values)] + + self.stub.GroupKVReplace( + request=_pb.v2.KV2Request( + Target=name, + Data=_pb.KVData( + Key=key, + Values=vals, + ), + ), + metadata=self.__metadata(), + ) + + def group_update_rules(self, name: str, action: v2.RuleAction, target: str): + """ + Manage rules on groups. Rules can an transparently include other groups, + recursively remove members, or reset the behavior of a group to default. + + :param name: group name + :type name: str + :param action: type of rule action to take + :type action: v2.RuleAction + :param target: target group name + :type target: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupUpdateRules( + request=_pb.v2.GroupRulesRequest( + Group=_pb.Group(Name=name), + Target=_pb.Group(Name=target), + RuleAction=action._into(), + ), + metadata=self.__metadata(), + ) + + def group_add_member(self, group: str, entity: str): + """ + Add a member to a group. + + :param group: target group name + :type group: str + :param entity: ID of the entity to add + :type entity: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupAddMember( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity(ID=entity, meta=_pb.EntityMeta(Groups=[group])), + ), + metadata=self.__metadata(), + ) + + def group_del_member(self, group: str, entity: str): + """ + Remove a member from a group. + + :param group: target group name + :type group: str + :param entity: ID of the entity to remove + :type entity: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupDelMember( + request=_pb.v2.EntityRequest( + Entity=_pb.Entity(ID=entity, meta=_pb.EntityMeta(Groups=[group])), + ), + metadata=self.__metadata(), + ) + + def group_destroy(self, name: str): + """ + Permanently remove a group from the server. This is not recommended as + NetAuth does not perform internal referential integrity checks, so it + is possible to remove a group that has rules pointing at it or + otherwise create cycles in the graph. The best practices are to keep + groups forever. They're cheap and as long as they're not queried they + don't represent additional load. + + :param name: target group name + :type name: str + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.GroupDestroy( + request=_pb.v2.GroupRequest( + Group=_pb.Group(Name=name), + ), + metadata=self.__metadata(), + ) + + def group_members(self, name: str) -> list[Entity]: + """ + Returns the membership of a group, include any alterations from rules. + + :param name: the name of the group + :type name: str + :raises grpc.RpcError: if the gRPC request fails + :return: the group members + :rtype: list[Entity] + """ + resp: _pb.v2.ListOfEntities = self.stub.GroupMembers( + request=_pb.v2.GroupRequest( + Group=_pb.Group(Name=name), + ), + metadata=self.__metadata(), + ) + return [Entity._from(e) for e in resp.Entities] + + def group_search(self, expr: str) -> list[Group]: + """ + Searches all groups. Does not require authentication. + + :param expr: expression to search for + :type expr: str + :raises grpc.RpcError: if the gRPC request fails + :return: list of groups that match the search criteria + :rtype: list[Group] + """ + resp: _pb.v2.ListOfGroups = self.stub.GroupSearch( + request=_pb.v2.SearchRequest(expression=expr), + metadata=self.__metadata(), + ) + if resp.Groups is not None: + return [Group._from(e) for e in resp.Groups] + return [] + + def system_capabilities(self, target: str, action: v2.Action, capability: Capability, direct: bool): + """ + Modify capabilities within the server. + + :param target: ID of the entity or group + :type target: str + :param action: action to perform (``Add`` or ``Drop``) + :type action: v2.Action + :param capability: capability to add or drop + :type capability: Capability + :param direct: TODO ??? + :type direct: bool + :raises grpc.RpcError: if the gRPC request fails + """ + # TODO: make writable + + self.stub.SystemCapabilities( + request=_pb.v2.CapabilityRequest( + Direct=direct, + Target=target, + Action=action._into(), + Capability=capability._into(), + ), + metadata=self.__metadata(), + ) + + def system_ping(self): + """ + Pings the server. Returns successfully if the server is healthy. + + :raises grpc.RpcError: if the gRPC request fails + """ + self.stub.SystemPing( + request=_pb.v2.Empty(), + metadata=self.__metadata(), + ) + + def system_status(self) -> v2.ServerStatus: + """ + Returns detailed status information about the server. + + :raises grpc.RpcError: if the gRPC request fails + :return: server status information + :rtype: v2.ServerStatus + """ + resp: _pb.v2.ServerStatus = self.stub.SystemStatus( + request=_pb.v2.Empty(), + metadata=self.__metadata(), + ) + return v2.ServerStatus._from(resp) diff --git a/netauth/_kv.py b/netauth/_kv.py new file mode 100644 index 0000000..e2ff9ea --- /dev/null +++ b/netauth/_kv.py @@ -0,0 +1,58 @@ +import re + +from . import _pb + + +def to_KVValue(value: str, index: int | None = None) -> _pb.KVValue: + return _pb.KVValue(Value=value, Index=index) + + +def from_KVValue(value: _pb.KVValue) -> str: + return value.Value + + +type KVDict = dict[str, list[str]] + + +def to_KVData(value: dict[str, list[str]]) -> list[_pb.KVData]: + out = [] + for key, val in value: + vals = [to_KVValue(value=v, index=i) for i, v in enumerate(val)] + out.append(_pb.KVData(Key=key, Values=vals)) + return out + + +def from_KVData(value: list[_pb.KVData]) -> KVDict: + return {kv.Key: [from_KVValue(v) for v in kv.Values] for kv in list(value)} + + +_kv_idx = re.compile(r"{(\d+)}$") + + +def parse_KVDict(raw: list[str]) -> KVDict: + """ + Convert a list of ``key:value``-format strings into a ``KVDict``. + If ``key`` is of the format ``key{n}``, it is inserted at index ``n``. + This is for converting KV1 values to a more reasonable format. + + :param raw: the raw key/value data + :type raw: list[str] + :return: the parsed KV data + :rtype: KVDict + """ + out: KVDict = {} + + for kv in raw: + k, _, v = kv.partition(":") + + idx = -1 + if (match := _kv_idx.search(k)) is not None: + idx = int(match.group(1)) + k = _kv_idx.sub(repl="", string=k, count=1) + + if k in out.keys(): + out[k].insert(idx, v) + else: + out[k] = [v] + + return out diff --git a/netauth/_types.py b/netauth/_types.py new file mode 100644 index 0000000..a1aa7d0 --- /dev/null +++ b/netauth/_types.py @@ -0,0 +1,171 @@ +from dataclasses import dataclass, field +from enum import Enum + +from . import _pb +from ._kv import ( + KVDict, + from_KVData, + to_KVData, +) + + +class Capability(Enum): + """System capabilities""" + + GlobalRoot = 0 + CreateEntity = 10 + DestroyEntity = 11 + ModifyEntityMeta = 12 + ModifyEntityKeys = 13 + ChangeEntitySecret = 14 + LockEntity = 15 + UnlockEntity = 16 + CreateGroup = 20 + DestroyGroup = 21 + ModifyGroupMeta = 22 + ModifyGroupMembers = 23 + + def _into(self) -> _pb.Capability: + """Convert the object into its RPC counterpart""" + return _pb.Capability(self.value) + + +class ExpansionMode(Enum): + """Group expansion modes""" + + Include = 1 + Exclude = 2 + Drop = 99 + + def _into(self) -> _pb.ExpansionMode: + """Convert the object into its RPC counterpart""" + return _pb.ExpansionMode(self.value) + + +@dataclass +class EntityMeta: + """Entity metadata""" + + primary_group: str | None = None + gecos: str | None = None + legal_name: str | None = None + display_name: str | None = None + home: str | None = None + shell: str | None = None + graphical_shell: str | None = None + badge_number: str | None = None + locked: bool = False + groups: list[str] = field(default_factory=list) + capabilities: list[Capability] = field(default_factory=list) + keys: list[str] = field(default_factory=list) + untyped_meta: list[str] = field(default_factory=list) + kv: KVDict = field(default_factory=dict) + + @classmethod + def _from(cls, inner: _pb.EntityMeta): + """Convert the object from its RPC counterpart""" + return cls( + primary_group=inner.PrimaryGroup, + gecos=inner.GECOS, + legal_name=inner.LegalName, + display_name=inner.DisplayName, + home=inner.Home, + shell=inner.Shell, + graphical_shell=inner.GraphicalShell, + badge_number=inner.BadgeNumber, + locked=inner.Locked, + groups=list(inner.Groups), + capabilities=[Capability(c) for c in inner.Capabilities], + keys=list(inner.Keys), + untyped_meta=list(inner.UntypedMeta), + kv=from_KVData(list(inner.KV)), + ) + + def _into(self) -> _pb.EntityMeta: + """Convert the object into its RPC counterpart""" + return _pb.EntityMeta( + PrimaryGroup=self.primary_group, + GECOS=self.gecos, + LegalName=self.legal_name, + DisplayName=self.display_name, + Home=self.home, + Shell=self.shell, + GraphicalShell=self.graphical_shell, + BadgeNumber=self.badge_number, + Locked=self.locked, + Groups=self.groups, + Capabilities=[c._into() for c in self.capabilities], + Keys=self.keys, + UntypedMeta=self.untyped_meta, + KV=to_KVData(self.kv), + ) + + +@dataclass +class Entity: + """Entity information""" + + id: str | None = None + number: int | None = None + secret: str | None = None + meta: EntityMeta | None = None + + @classmethod + def _from(cls, inner: _pb.Entity): + """Convert the object from its RPC counterpart""" + return cls( + id=inner.ID, + number=inner.Number, + secret=inner.secret, + meta=EntityMeta._from(inner.meta), + ) + + def _into(self) -> _pb.Entity: + """Convert the object into its RPC counterpart""" + return _pb.Entity( + ID=self.id, + Number=self.number, + secret=self.secret, + meta=self.meta._into() if self.meta is not None else None, + ) + + +@dataclass +class Group: + """Group information""" + + name: str | None = None + display_name: str | None = None + number: int | None = None + managed_by: str | None = None + capabilities: list[Capability] = field(default_factory=list) + expansions: list[str] = field(default_factory=list) + untyped_meta: list[str] = field(default_factory=list) + kv: KVDict = field(default_factory=dict) + + @classmethod + def _from(cls, inner: _pb.Group): + """Convert the object from its RPC counterpart""" + return cls( + name=inner.Name, + display_name=inner.DisplayName, + number=inner.Number, + managed_by=inner.ManagedBy, + capabilities=[Capability(c) for c in inner.Capabilities], + expansions=list(inner.Expansions), + untyped_meta=list(inner.UntypedMeta), + kv=from_KVData(list(inner.KV)), + ) + + def _into(self) -> _pb.Group: + """Convert the object into its RPC counterpart""" + return _pb.Group( + Name=self.name, + DisplayName=self.display_name, + Number=self.number, + ManagedBy=self.managed_by, + Capabilities=[c._into() for c in self.capabilities], + Expansions=self.expansions, + UntypedMeta=self.untyped_meta, + KV=to_KVData(self.kv), + ) diff --git a/netauth/v2/__init__.py b/netauth/v2/__init__.py new file mode 100644 index 0000000..1bc87ff --- /dev/null +++ b/netauth/v2/__init__.py @@ -0,0 +1,90 @@ +from dataclasses import dataclass, field +from enum import Enum + +from .. import _pb + +__all__ = [ + "Action", + "RuleAction", + "SubSystemStatus", + "ServerStatus", +] + + +class Action(Enum): + """Actions available for various operations""" + + Add = 1 + Drop = 2 + Upsert = 3 + ClearExact = 4 + ClearFuzzy = 5 + Read = 6 + + def _into(self) -> _pb.v2.Action: + """Convert the object into its RPC counterpart""" + return _pb.v2.Action(self.value) + + +class RuleAction(Enum): + """Actions available for group rule operations""" + + Include = 1 + Exclude = 2 + RemoveRule = 99 + + def _into(self) -> _pb.v2.RuleAction: + """Convert the object into its RPC counterpart""" + return _pb.v2.RuleAction(self.value) + + +@dataclass +class SubSystemStatus: + """Subsystem status information""" + + name: str | None = None + ok: bool = True + fault_message: str | None = None + + @classmethod + def _from(cls, inner: _pb.v2.SubSystemStatus): + """Convert the object from its RPC counterpart""" + return cls( + name=inner.Name, + ok=inner.OK, + fault_message=inner.FaultMessage, + ) + + def _into(self) -> _pb.v2.SubSystemStatus: + """Convert the object into its RPC counterpart""" + return _pb.v2.SubSystemStatus( + Name=self.name, + OK=self.ok, + FaultMessage=self.fault_message, + ) + + +@dataclass +class ServerStatus: + """Server status information""" + + system_ok: bool = False + first_failure: SubSystemStatus | None = None + subsystems: list[SubSystemStatus] = field(default_factory=list) + + @classmethod + def _from(cls, inner: _pb.v2.ServerStatus): + """Convert the object from its RPC counterpart""" + return cls( + system_ok=inner.SystemOK, + first_failure=SubSystemStatus._from(inner.FirstFailure), + subsystems=[SubSystemStatus._from(s) for s in inner.SubSystems], + ) + + def _into(self) -> _pb.v2.ServerStatus: + """Convert the object into its RPC counterpart""" + return _pb.v2.ServerStatus( + SystemOK=self.system_ok, + FirstFailure=self.first_failure._into() if self.first_failure is not None else None, + SubSystems=[s._into() for s in self.subsystems], + ) diff --git a/test.py b/test.py new file mode 100644 index 0000000..44c31e2 --- /dev/null +++ b/test.py @@ -0,0 +1,27 @@ +from pathlib import Path +from pprint import pprint + +from grpc._channel import _InactiveRpcError + +import netauth + +# with netauth.NetAuth2.with_config("/home/abi/.netauth/config-void.toml") as na: +# with netauth.NetAuth2("netauth.voidlinux.org", cert="/home/abi/.netauth/keys/tls-void.pem") as na: +with netauth.NetAuth2("netauth.miaow.io", cert="/home/abi/.netauth/keys/tls-miaow.pem") as na: + try: + resp = na.system_status() + print(resp) + except _InactiveRpcError as e: + print(e) + + try: + resp = na.entity_info(entity=netauth.Entity(id="abby"), data=netauth.Entity(id="abby")) + pprint(resp) + except _InactiveRpcError as e: + print(e) + + try: + resp = na.entity_kv_get(target="abby", keys=["awoo"]) + pprint(resp) + except _InactiveRpcError as e: + print(e)