-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #79 from pydsigner/GH-57_development-webserver
Add a development webserver (#57)
- Loading branch information
Showing
10 changed files
with
341 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
from __future__ import annotations | ||
|
||
import argparse | ||
import hashlib | ||
import http.server | ||
import mimetypes | ||
import os | ||
import pathlib | ||
import typing | ||
if typing.TYPE_CHECKING: | ||
from socketserver import _AfInetAddress | ||
|
||
|
||
INDEX_FILE = 'index.html' | ||
# Default used by nginx | ||
DEFAULT_MIME_TYPE = 'application/octet-stream' | ||
|
||
|
||
class ThreadedHTTPServer(http.server.ThreadingHTTPServer): | ||
""" | ||
A simple threaded HTTP server with a more powerful handler. | ||
""" | ||
def __init__(self, | ||
server_address: _AfInetAddress, | ||
directory: str | pathlib.Path = '.', | ||
bind_and_activate: bool = True) -> None: | ||
super().__init__(server_address, Handler, bind_and_activate) | ||
self.directory = str(directory) | ||
|
||
def finish_request(self, request, client_address) -> None: | ||
Handler(request, client_address, self, directory=self.directory) | ||
|
||
|
||
class Handler(http.server.SimpleHTTPRequestHandler): | ||
def get_etag(self, file_path): | ||
""" | ||
Generate an etag for a file based on its path and modification time. | ||
""" | ||
mtime = os.path.getmtime(file_path) | ||
file_size = os.path.getsize(file_path) | ||
file_info = f"{file_size}-{mtime}" | ||
return hashlib.md5(file_info.encode('utf-8')).hexdigest() | ||
|
||
def do_GET(self): | ||
try: | ||
# Get the etag for the file | ||
file_path = pathlib.Path(self.translate_path(self.path)) | ||
if file_path.is_dir(): | ||
file_path /= INDEX_FILE | ||
|
||
# Double-check that we haven't escaped the directory. | ||
# self.translate_path() should discard any suspicious path | ||
# components, but it's better to be safe. | ||
if not file_path.is_relative_to(self.directory): | ||
return self.send_error(403, 'Forbidden') | ||
|
||
etag = self.get_etag(file_path) | ||
# Check if the client already has the file | ||
if 'If-None-Match' in self.headers and self.headers['If-None-Match'] == etag: | ||
self.send_response(304) | ||
self.end_headers() | ||
else: | ||
# Get the file extension and set the MIME type accordingly | ||
mime_type, _enc = mimetypes.guess_type(file_path) | ||
self.send_response(200) | ||
self.send_header('Content-type', mime_type or DEFAULT_MIME_TYPE) | ||
self.send_header('ETag', etag) | ||
self.end_headers() | ||
# Serve the file | ||
with open(file_path, 'rb') as file: | ||
# Serve the file in chunks to avoid reading the entire file | ||
# into memory | ||
chunk_size = 8192 | ||
while True: | ||
chunk = file.read(chunk_size) | ||
if not chunk: | ||
break | ||
self.wfile.write(chunk) | ||
except FileNotFoundError: | ||
self.send_error(404, f'File Not Found: {self.path}') | ||
|
||
|
||
def serve(port: int, directory: str | pathlib.Path, host: str = 'localhost'): | ||
with ThreadedHTTPServer((host, port), directory=directory) as httpd: | ||
print(f'Serving at http://localhost:{port}') | ||
httpd.serve_forever() | ||
|
||
|
||
def main(arguments: list[str] | None = None): | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('-p', '--port', | ||
help='port to serve from', | ||
type=int, | ||
default=8080) | ||
parser.add_argument('-d', '--directory', | ||
help='directory to serve', | ||
type=pathlib.Path, | ||
default='.') | ||
args = parser.parse_args(arguments) | ||
serve(args.port, args.directory) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import json | ||
import pathlib | ||
import runpy | ||
import typing as t | ||
|
||
import anchovy.cli | ||
from anchovy.core import BuildSettings, Context, ContextDir, Rule | ||
from anchovy.custody import CONTEXT_DIR_KEYS | ||
|
||
|
||
MTIME_MODE_NONE = 0 | ||
MTIME_MODE_NE = 1 | ||
MTIME_MODE_EQ = 2 | ||
|
||
|
||
def get_context_dir(context: Context, key: str): | ||
path = pathlib.Path(key) | ||
if key in CONTEXT_DIR_KEYS: | ||
return key | ||
return t.cast('ContextDir', str(path.parents[-2])) | ||
|
||
|
||
def load_example(path: pathlib.Path): | ||
return runpy.run_path(str(path)) | ||
|
||
|
||
def load_artifact(path: pathlib.Path): | ||
with path.open() as file: | ||
return json.load(file) | ||
|
||
|
||
def load_context(path: pathlib.Path, tmp_dir: pathlib.Path, purge_dirs: bool = False): | ||
module_items = load_example(path) | ||
input_dir: pathlib.Path = module_items['SETTINGS']['input_dir'] | ||
artifact_path = tmp_dir / 'artifact.json' | ||
|
||
rules: list[Rule] = module_items['RULES'] | ||
settings = BuildSettings( | ||
input_dir=input_dir, | ||
output_dir=tmp_dir / 'output', | ||
working_dir=tmp_dir / 'working', | ||
custody_cache=artifact_path, | ||
purge_dirs=purge_dirs, | ||
) | ||
return Context(settings, rules) | ||
|
||
|
||
def run_example(path: pathlib.Path, tmp_dir: pathlib.Path, purge_dirs: bool = False): | ||
context = load_context(path, tmp_dir, purge_dirs) | ||
context.run() | ||
return context | ||
|
||
|
||
def run_example_cli(path: pathlib.Path, tmp_dir: pathlib.Path, purge_dirs: bool = False): | ||
context = load_context(path, tmp_dir, purge_dirs) | ||
context.custodian.bind(context) | ||
|
||
arguments = [ | ||
str(path), | ||
'--custody-cache', str(context['custody_cache']) | ||
] | ||
if purge_dirs: | ||
arguments.append('--purge') | ||
|
||
anchovy.cli.main(arguments) | ||
|
||
return context | ||
|
||
|
||
def canonicalize_graph(graph: dict): | ||
for key, val in graph.items(): | ||
if isinstance(val, list): | ||
val.sort() | ||
elif isinstance(val, dict): | ||
canonicalize_graph(val) | ||
|
||
return graph | ||
|
||
|
||
def compare_artifacts(old: dict, new: dict, context: Context, mtime_mode=MTIME_MODE_NONE): | ||
assert canonicalize_graph(new['graph']) == canonicalize_graph(old['graph']) | ||
assert new['meta'].keys() == old['meta'].keys() | ||
for key in new['meta']: | ||
n_type, n_dict = new['meta'][key] | ||
o_type, o_dict = old['meta'][key] | ||
print(f'{key}:\n new={n_dict}\n old={o_dict}') | ||
assert n_type == o_type | ||
if n_type == 'path': | ||
context_dir = get_context_dir(context, key) | ||
path = context.custodian.degenericize_path(key) | ||
if path.is_dir(): | ||
continue | ||
try: | ||
assert n_dict['sha1'] == o_dict['sha1'] | ||
assert n_dict['size'] == o_dict['size'] | ||
if mtime_mode == MTIME_MODE_NE and context_dir != 'input_dir': | ||
assert n_dict['m_time'] != o_dict['m_time'] | ||
elif mtime_mode == MTIME_MODE_EQ: | ||
assert n_dict['m_time'] == o_dict['m_time'] | ||
except AssertionError: | ||
print(path.read_bytes()) | ||
raise | ||
else: | ||
assert n_dict.keys() == o_dict.keys() |
Oops, something went wrong.