Skip to content

Commit

Permalink
replaced client return types with objects refs #14
Browse files Browse the repository at this point in the history
  • Loading branch information
infinityofspace committed Dec 8, 2023
1 parent e58e81d commit 07ed442
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 38 deletions.
74 changes: 38 additions & 36 deletions pkb_client/client.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import json
import logging
from pathlib import Path
from typing import Optional, Tuple, List
from typing import Optional, List
from urllib.parse import urljoin

import requests

from pkb_client.dns import DNSRecord, DNSRestoreMode
from pkb_client.dns import DNSRecord, DNSRestoreMode, DNSRecordType
from pkb_client.ssl import SSLCertBundle

API_ENDPOINT = "https://porkbun.com/api/json/v3/"
SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"]

# prevent urllib3 to log request with the api key and secret
logging.getLogger("urllib3").setLevel(logging.WARNING)


class PKBClientException(Exception):
pass
def __init__(self, status, message):
super().__init__(f"{status}: {message}")


class PKBClient:
Expand Down Expand Up @@ -67,11 +68,12 @@ def ping(self, **kwargs) -> str:
return json.loads(r.text).get("yourIp", None)
else:
response_json = json.loads(r.text)
raise PKBClientException(f"{response_json['status']}: {response_json['message']}")
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def dns_create(self,
domain: str,
record_type: str,
record_type: DNSRecordType,
content: str,
name: Optional[str] = None,
ttl: Optional[int] = 300,
Expand All @@ -93,7 +95,6 @@ def dns_create(self,
"""

assert domain is not None and len(domain) > 0
assert record_type in SUPPORTED_DNS_RECORD_TYPES
assert content is not None and len(content) > 0
assert ttl is None or 300 <= ttl <= 2147483647

Expand All @@ -112,12 +113,13 @@ def dns_create(self,
return str(json.loads(r.text).get("id", None))
else:
response_json = json.loads(r.text)
raise PKBClientException(f"{response_json['status']}: {response_json['message']}")
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def dns_edit(self,
domain: str,
record_id: str,
record_type: str,
record_type: DNSRecordType,
content: str,
name: str = None,
ttl: int = 300,
Expand All @@ -142,7 +144,6 @@ def dns_edit(self,

assert domain is not None and len(domain) > 0
assert record_id is not None and len(record_id) > 0
assert record_type in SUPPORTED_DNS_RECORD_TYPES
assert content is not None and len(content) > 0
assert ttl is None or 300 <= ttl <= 2147483647

Expand All @@ -161,7 +162,8 @@ def dns_edit(self,
return True
else:
response_json = json.loads(r.text)
raise PKBClientException(f"{response_json['status']}: {response_json['message']}")
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def dns_delete(self,
domain: str,
Expand All @@ -188,7 +190,8 @@ def dns_delete(self,
return True
else:
response_json = json.loads(r.text)
raise PKBClientException(f"{response_json['status']}: {response_json['message']}")
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def dns_retrieve(self, domain, **kwargs) -> List[DNSRecord]:
"""
Expand All @@ -210,7 +213,8 @@ def dns_retrieve(self, domain, **kwargs) -> List[DNSRecord]:
return [DNSRecord.from_dict(record) for record in json.loads(r.text).get("records", [])]
else:
response_json = json.loads(r.text)
raise PKBClientException(f"{response_json['status']}: {response_json['message']}")
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def dns_export(self, domain: str, filename: str, **kwargs) -> bool:
"""
Expand All @@ -233,7 +237,7 @@ def dns_export(self, domain: str, filename: str, **kwargs) -> bool:
# merge the single DNS records into one single dict with the record id as key
dns_records_dict = dict()
for record in dns_records:
dns_records_dict[record["id"]] = record
dns_records_dict[record.id] = record

filepath = Path(filename)
if filepath.exists():
Expand Down Expand Up @@ -276,7 +280,7 @@ def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, *
try:
# delete all existing DNS records
for record in existing_dns_records:
self.dns_delete(domain, record["id"])
self.dns_delete(domain, record.id)

# restore all exported records by creating new DNS records
for _, exported_record in exported_dns_records_dict.items():
Expand All @@ -297,7 +301,7 @@ def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, *

try:
for existing_record in existing_dns_records:
record_id = existing_record["id"]
record_id = existing_record.id
exported_record = exported_dns_records_dict.get(record_id, None)
# also check if the exported dns record is different to the existing record,
# so we can reduce unnecessary api calls
Expand All @@ -320,7 +324,7 @@ def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, *

existing_dns_records_dict = dict()
for record in existing_dns_records:
existing_dns_records_dict[record["id"]] = record
existing_dns_records_dict[record.id] = record

try:
for _, exported_record in exported_dns_records_dict.items():
Expand Down Expand Up @@ -365,9 +369,9 @@ def set_dns_servers(self, domain: str, name_servers: List[str]) -> bool:
if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS":
return True
else:
raise Exception("ERROR: set dns servers api call was not successfully\n"
"Status code: {}\n"
"Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
response_json = json.loads(r.text)
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def get_dns_servers(self, domain: str) -> List[str]:
"""
Expand All @@ -386,9 +390,9 @@ def get_dns_servers(self, domain: str) -> List[str]:
if r.status_code == 200:
return json.loads(r.text).get("ns", [])
else:
raise Exception("ERROR: get dns servers api call was not successfully\n"
"Status code: {}\n"
"Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
response_json = json.loads(r.text)
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def get_domain_pricing(self, **kwargs) -> dict:
"""
Expand All @@ -404,11 +408,11 @@ def get_domain_pricing(self, **kwargs) -> dict:
if r.status_code == 200:
return json.loads(r.text)
else:
raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n"
"Status code: {}\n"
"Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
response_json = json.loads(r.text)
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]:
def ssl_retrieve(self, domain, **kwargs) -> SSLCertBundle:
"""
API SSL bundle retrieve method: retrieve an SSL bundle for given domain
see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info
Expand All @@ -427,16 +431,14 @@ def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]:
if r.status_code == 200:
ssl_bundle = json.loads(r.text)

intermediate_certificate = ssl_bundle["intermediate_certificate"]
certificate_chain = ssl_bundle["certificate_chain"]
private_key = ssl_bundle["private_key"]
public_key = ssl_bundle["public_key"]

return intermediate_certificate, certificate_chain, private_key, public_key
return SSLCertBundle(intermediate_certificate=ssl_bundle["intermediate_certificate"],
certificate_chain=ssl_bundle["certificate_chain"],
private_key=ssl_bundle["private_key"],
public_key=ssl_bundle["public_key"])
else:
raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n"
"Status code: {}\n"
"Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
response_json = json.loads(r.text)
raise PKBClientException(response_json.get("status", "Unknown status"),
response_json.get("message", "Unknown message"))

@staticmethod
def __handle_error_backup__(dns_records):
Expand Down
17 changes: 15 additions & 2 deletions pkb_client/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,24 @@
from enum import Enum


class DNSRecordType(Enum):
A = "A"
AAAA = "AAAA"
MX = "MX"
CNAME = "CNAME"
ALIAS = "ALIAS"
TXT = "TXT"
NS = "NS"
SRV = "SRV"
TLSA = "TLSA"
CAA = "CAA"


@dataclass
class DNSRecord:
id: str
name: str
type: str
type: DNSRecordType
content: str
ttl: str
prio: str
Expand All @@ -17,7 +30,7 @@ def from_dict(d):
return DNSRecord(
id=d["id"],
name=d["name"],
type=d["type"],
type=DNSRecordType[d["type"]],
content=d["content"],
ttl=d["ttl"],
prio=d["prio"],
Expand Down
16 changes: 16 additions & 0 deletions pkb_client/ssl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass


@dataclass
class SSLCertBundle:
# The intermediate certificate.
intermediate_certificate: str

# The complete certificate chain.
certificate_chain: str

# The private key.
private_key: str

# The public key.
public_key: str

0 comments on commit 07ed442

Please sign in to comment.