Skip to content

Commit

Permalink
Merge branch 'bind_file_support' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
infinityofspace committed Sep 29, 2024
2 parents 31cfc66 + 3fbf0bd commit d896a97
Show file tree
Hide file tree
Showing 16 changed files with 445 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkb_client/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .cli import main
29 changes: 22 additions & 7 deletions pkb_client/cli.py → pkb_client/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import textwrap
from datetime import datetime

from pkb_client.client import PKBClient, DNSRestoreMode, API_ENDPOINT
from pkb_client.dns import DNSRecordType
from pkb_client.forwarding import URLForwardingType
from pkb_client.client import PKBClient, API_ENDPOINT
from pkb_client.client.dns import DNSRecordType, DNSRestoreMode
from pkb_client.client.forwarding import URLForwardingType


class CustomJSONEncoder(json.JSONEncoder):
Expand All @@ -33,7 +33,7 @@ def main():
License: Apache-2.0 https://github.com/psf/requests/blob/master/LICENSE
setuptools:
Project: https://github.com/pypa/setuptools
License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE
License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE
""")
)

Expand Down Expand Up @@ -86,23 +86,38 @@ def main():
parser_dns_receive.set_defaults(func=PKBClient.dns_retrieve)
parser_dns_receive.add_argument("domain", help="The domain for which the DNS record should be retrieved.")

parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local file as json.")
parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local json file.")
parser_dns_export.set_defaults(func=PKBClient.dns_export)
parser_dns_export.add_argument("domain",
help="The domain for which the DNS record should be retrieved and saved.")
parser_dns_export.add_argument("filename", help="The filename where to save the exported DNS records.")

parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local file.",
parser_dns_export_bind = subparsers.add_parser("dns-export-bind", help="Save all DNS records to a local BIND file.")
parser_dns_export_bind.set_defaults(func=PKBClient.dns_export_bind)
parser_dns_export_bind.add_argument("domain",
help="The domain for which the DNS record should be retrieved and saved.")
parser_dns_export_bind.add_argument("filename", help="The filename where to save the exported DNS records.")

parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local json file.",
formatter_class=argparse.RawTextHelpFormatter)
parser_dns_import.set_defaults(func=PKBClient.dns_import)
parser_dns_import.add_argument("domain", help="The domain for which the DNS record should be restored.")
parser_dns_import.add_argument("filename", help="The filename from which the DNS records are to be restored.")
parser_dns_import.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id):
clean: remove all existing DNS records and restore all DNS records from the provided file
clear: remove all existing DNS records and restore all DNS records from the provided file
replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records
keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist
""", type=DNSRestoreMode.from_string, choices=list(DNSRestoreMode))

parser_dns_import_bind = subparsers.add_parser("dns-import-bind",
help="Restore all DNS records from a local BIND file.",
formatter_class=argparse.RawTextHelpFormatter)
parser_dns_import_bind.set_defaults(func=PKBClient.dns_import_bind)
parser_dns_import_bind.add_argument("filename", help="The filename from which the DNS records are to be restored.")
parser_dns_import_bind.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id):
clear: remove all existing DNS records and restore all DNS records from the provided file
""", type=DNSRestoreMode.from_string, choices=[DNSRestoreMode.clear])

parser_domain_pricing = subparsers.add_parser("domain-pricing", help="Get the pricing for Porkbun domains.")
parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing)

Expand Down
6 changes: 6 additions & 0 deletions pkb_client/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .bind_file import BindFile, BindRecord, RecordClass
from .client import PKBClient, PKBClientException, API_ENDPOINT
from .dns import DNSRecord, DNSRestoreMode, DNSRecordType
from .domain import DomainInfo
from .forwarding import URLForwarding, URLForwardingType
from .ssl_cert import SSLCertBundle
155 changes: 155 additions & 0 deletions pkb_client/client/bind_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List

from pkb_client.client.dns import DNSRecordType, DNS_RECORDS_WITH_PRIORITY


class RecordClass(str, Enum):
IN = "IN"

def __str__(self):
return self.value


@dataclass
class BindRecord:
name: str
ttl: int
record_class: RecordClass
record_type: DNSRecordType
data: str
prio: Optional[int] = None
comment: Optional[str] = None

def __str__(self):
record_string = f"{self.name} {self.ttl} {self.record_class} {self.record_type}"
if self.prio is not None:
record_string += f" {self.prio}"
record_string += f" {self.data}"
if self.comment:
record_string += f" ; {self.comment}"
return record_string


class BindFile:
origin: str
ttl: Optional[int] = None
records: List[BindRecord]

def __init__(self, origin: str, ttl: Optional[int] = None, records: Optional[List[BindRecord]] = None) -> None:
self.origin = origin
self.ttl = ttl
self.records = records or []

@staticmethod
def from_file(file_path: str) -> "BindFile":
with open(file_path, "r") as f:
file_data = f.readlines()

# parse the file line by line
origin = None
ttl = None
records = []
for line in file_data:
if line.startswith("$ORIGIN"):
origin = line.split()[1]
elif line.startswith("$TTL"):
ttl = int(line.split()[1])
else:
# parse the records with the two possible formats:
# 1: name ttl record-class record-type record-data
# 2: name record-class ttl record-type record-data
# whereby the ttl is optional

# drop any right trailing comments
line_parts = line.split(";", 1)
line = line_parts[0].strip()
comment = line_parts[1].strip() if len(line_parts) > 1 else None
prio = None

# skip empty lines
if not line:
continue

# find which format the line is
record_parts = line.split()
if record_parts[1].isdigit():
# scheme 1
if record_parts[3] not in DNSRecordType.__members__:
logging.warning(f"Ignoring unsupported record type: {line}")
continue
if record_parts[2] not in RecordClass.__members__:
logging.warning(f"Ignoring unsupported record class: {line}")
continue
record_name = record_parts[0]
record_ttl = int(record_parts[1])
record_class = RecordClass[record_parts[2]]
record_type = DNSRecordType[record_parts[3]]
if record_type in DNS_RECORDS_WITH_PRIORITY:
prio = int(record_parts[4])
record_data = " ".join(record_parts[5:])
else:
record_data = " ".join(record_parts[4:])
elif record_parts[2].isdigit():
# scheme 2
if record_parts[3] not in DNSRecordType.__members__:
logging.warning(f"Ignoring unsupported record type: {line}")
continue
if record_parts[1] not in RecordClass.__members__:
logging.warning(f"Ignoring unsupported record class: {line}")
continue
record_name = record_parts[0]
record_ttl = int(record_parts[2])
record_class = RecordClass[record_parts[1]]
record_type = DNSRecordType[record_parts[3]]
if record_type in DNS_RECORDS_WITH_PRIORITY:
prio = int(record_parts[4])
record_data = " ".join(record_parts[5:])
else:
record_data = " ".join(record_parts[4:])
else:
# no ttl, use default or previous
if record_parts[2] not in DNSRecordType.__members__:
logging.warning(f"Ignoring unsupported record type: {line}")
continue
if record_parts[1] not in RecordClass.__members__:
logging.warning(f"Ignoring unsupported record class: {line}")
continue
record_name = record_parts[0]
if ttl is None and not records:
raise ValueError("No TTL found in file")
record_ttl = ttl or records[-1].ttl
record_class = RecordClass[record_parts[1]]
record_type = DNSRecordType[record_parts[2]]
if record_type in DNS_RECORDS_WITH_PRIORITY:
prio = int(record_parts[3])
record_data = " ".join(record_parts[4:])
else:
record_data = " ".join(record_parts[3:])

# replace @ in record name with origin
record_name = record_name.replace("@", origin)

records.append(BindRecord(record_name, record_ttl, record_class, record_type, record_data, prio=prio,
comment=comment))

if origin is None:
raise ValueError("No origin found in file")

return BindFile(origin, ttl, records)

def to_file(self, file_path: str) -> None:
with open(file_path, "w") as f:
f.write(str(self))

def __str__(self) -> str:
bind = f"$ORIGIN {self.origin}\n"

if self.ttl is not None:
bind += f"$TTL {self.ttl}\n"

for record in self.records:
bind += f"{record}\n"
return bind
Loading

0 comments on commit d896a97

Please sign in to comment.