From 07ed442f13c264655893011bb5067e4ffa21aaee Mon Sep 17 00:00:00 2001 From: infinityofspace <30715462+infinityofspace@users.noreply.github.com> Date: Fri, 8 Dec 2023 20:27:16 +0100 Subject: [PATCH] replaced client return types with objects refs #14 --- pkb_client/client.py | 74 +++++++++++++++++++++++--------------------- pkb_client/dns.py | 17 ++++++++-- pkb_client/ssl.py | 16 ++++++++++ 3 files changed, 69 insertions(+), 38 deletions(-) create mode 100644 pkb_client/ssl.py diff --git a/pkb_client/client.py b/pkb_client/client.py index 57b694c..e25a8a0 100644 --- a/pkb_client/client.py +++ b/pkb_client/client.py @@ -1,22 +1,23 @@ import json import logging from pathlib import Path -from typing import Optional, Tuple, List +from typing import Optional, List from urllib.parse import urljoin import requests -from pkb_client.dns import DNSRecord, DNSRestoreMode +from pkb_client.dns import DNSRecord, DNSRestoreMode, DNSRecordType +from pkb_client.ssl import SSLCertBundle API_ENDPOINT = "https://porkbun.com/api/json/v3/" -SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"] # prevent urllib3 to log request with the api key and secret logging.getLogger("urllib3").setLevel(logging.WARNING) class PKBClientException(Exception): - pass + def __init__(self, status, message): + super().__init__(f"{status}: {message}") class PKBClient: @@ -67,11 +68,12 @@ def ping(self, **kwargs) -> str: return json.loads(r.text).get("yourIp", None) else: response_json = json.loads(r.text) - raise PKBClientException(f"{response_json['status']}: {response_json['message']}") + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def dns_create(self, domain: str, - record_type: str, + record_type: DNSRecordType, content: str, name: Optional[str] = None, ttl: Optional[int] = 300, @@ -93,7 +95,6 @@ def dns_create(self, """ assert domain is not None and len(domain) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES assert content is not None and len(content) > 0 assert ttl is None or 300 <= ttl <= 2147483647 @@ -112,12 +113,13 @@ def dns_create(self, return str(json.loads(r.text).get("id", None)) else: response_json = json.loads(r.text) - raise PKBClientException(f"{response_json['status']}: {response_json['message']}") + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def dns_edit(self, domain: str, record_id: str, - record_type: str, + record_type: DNSRecordType, content: str, name: str = None, ttl: int = 300, @@ -142,7 +144,6 @@ def dns_edit(self, assert domain is not None and len(domain) > 0 assert record_id is not None and len(record_id) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES assert content is not None and len(content) > 0 assert ttl is None or 300 <= ttl <= 2147483647 @@ -161,7 +162,8 @@ def dns_edit(self, return True else: response_json = json.loads(r.text) - raise PKBClientException(f"{response_json['status']}: {response_json['message']}") + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def dns_delete(self, domain: str, @@ -188,7 +190,8 @@ def dns_delete(self, return True else: response_json = json.loads(r.text) - raise PKBClientException(f"{response_json['status']}: {response_json['message']}") + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def dns_retrieve(self, domain, **kwargs) -> List[DNSRecord]: """ @@ -210,7 +213,8 @@ def dns_retrieve(self, domain, **kwargs) -> List[DNSRecord]: return [DNSRecord.from_dict(record) for record in json.loads(r.text).get("records", [])] else: response_json = json.loads(r.text) - raise PKBClientException(f"{response_json['status']}: {response_json['message']}") + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def dns_export(self, domain: str, filename: str, **kwargs) -> bool: """ @@ -233,7 +237,7 @@ def dns_export(self, domain: str, filename: str, **kwargs) -> bool: # merge the single DNS records into one single dict with the record id as key dns_records_dict = dict() for record in dns_records: - dns_records_dict[record["id"]] = record + dns_records_dict[record.id] = record filepath = Path(filename) if filepath.exists(): @@ -276,7 +280,7 @@ def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, * try: # delete all existing DNS records for record in existing_dns_records: - self.dns_delete(domain, record["id"]) + self.dns_delete(domain, record.id) # restore all exported records by creating new DNS records for _, exported_record in exported_dns_records_dict.items(): @@ -297,7 +301,7 @@ def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, * try: for existing_record in existing_dns_records: - record_id = existing_record["id"] + record_id = existing_record.id exported_record = exported_dns_records_dict.get(record_id, None) # also check if the exported dns record is different to the existing record, # so we can reduce unnecessary api calls @@ -320,7 +324,7 @@ def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, * existing_dns_records_dict = dict() for record in existing_dns_records: - existing_dns_records_dict[record["id"]] = record + existing_dns_records_dict[record.id] = record try: for _, exported_record in exported_dns_records_dict.items(): @@ -365,9 +369,9 @@ def set_dns_servers(self, domain: str, name_servers: List[str]) -> bool: if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS": return True else: - raise Exception("ERROR: set dns servers api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + response_json = json.loads(r.text) + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def get_dns_servers(self, domain: str) -> List[str]: """ @@ -386,9 +390,9 @@ def get_dns_servers(self, domain: str) -> List[str]: if r.status_code == 200: return json.loads(r.text).get("ns", []) else: - raise Exception("ERROR: get dns servers api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + response_json = json.loads(r.text) + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) def get_domain_pricing(self, **kwargs) -> dict: """ @@ -404,11 +408,11 @@ def get_domain_pricing(self, **kwargs) -> dict: if r.status_code == 200: return json.loads(r.text) else: - raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + response_json = json.loads(r.text) + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) - def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: + def ssl_retrieve(self, domain, **kwargs) -> SSLCertBundle: """ API SSL bundle retrieve method: retrieve an SSL bundle for given domain see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info @@ -427,16 +431,14 @@ def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: if r.status_code == 200: ssl_bundle = json.loads(r.text) - intermediate_certificate = ssl_bundle["intermediate_certificate"] - certificate_chain = ssl_bundle["certificate_chain"] - private_key = ssl_bundle["private_key"] - public_key = ssl_bundle["public_key"] - - return intermediate_certificate, certificate_chain, private_key, public_key + return SSLCertBundle(intermediate_certificate=ssl_bundle["intermediate_certificate"], + certificate_chain=ssl_bundle["certificate_chain"], + private_key=ssl_bundle["private_key"], + public_key=ssl_bundle["public_key"]) else: - raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + response_json = json.loads(r.text) + raise PKBClientException(response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message")) @staticmethod def __handle_error_backup__(dns_records): diff --git a/pkb_client/dns.py b/pkb_client/dns.py index b2054f8..2803fa3 100644 --- a/pkb_client/dns.py +++ b/pkb_client/dns.py @@ -2,11 +2,24 @@ from enum import Enum +class DNSRecordType(Enum): + A = "A" + AAAA = "AAAA" + MX = "MX" + CNAME = "CNAME" + ALIAS = "ALIAS" + TXT = "TXT" + NS = "NS" + SRV = "SRV" + TLSA = "TLSA" + CAA = "CAA" + + @dataclass class DNSRecord: id: str name: str - type: str + type: DNSRecordType content: str ttl: str prio: str @@ -17,7 +30,7 @@ def from_dict(d): return DNSRecord( id=d["id"], name=d["name"], - type=d["type"], + type=DNSRecordType[d["type"]], content=d["content"], ttl=d["ttl"], prio=d["prio"], diff --git a/pkb_client/ssl.py b/pkb_client/ssl.py new file mode 100644 index 0000000..b357601 --- /dev/null +++ b/pkb_client/ssl.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass + + +@dataclass +class SSLCertBundle: + # The intermediate certificate. + intermediate_certificate: str + + # The complete certificate chain. + certificate_chain: str + + # The private key. + private_key: str + + # The public key. + public_key: str