diff --git a/worker/games.py b/worker/games.py index 3d3c9c770..ff29165f5 100644 --- a/worker/games.py +++ b/worker/games.py @@ -3,6 +3,7 @@ import hashlib import io import json +import logging import math import multiprocessing import os @@ -24,6 +25,8 @@ from queue import Empty, Queue from zipfile import ZipFile +logger = logging.getLogger(__name__) + try: import requests except ImportError: @@ -91,16 +94,11 @@ def backup_log(): logfile = Path(__file__).resolve().parent / LOGFILE logfile_previous = logfile.with_suffix(logfile.suffix + ".previous") if logfile.exists(): - print("Moving logfile {} to {}".format(logfile, logfile_previous)) + logger.info("Moving logfile {} to {}".format(logfile, logfile_previous)) with LOG_LOCK: logfile.replace(logfile_previous) except Exception as e: - print( - "Exception moving log:\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception moving log: " + str(e)) def str_signal(signal_): @@ -148,12 +146,7 @@ def update_atime(path): mtime = os.stat(path).st_mtime os.utime(path, times=(atime, mtime)) except OSError as e: - print( - f"Failed to update the atime of {path}:\n", - e, - sep="", - file=sys.stderr, - ) + logger.error(f"Failed to update the atime of {path}: " + str(e)) def cache_read(cache, name): @@ -213,12 +206,7 @@ def requests_get(remote, *args, **kw): result = requests.get(remote, *args, **kw) result.raise_for_status() # also catch return codes >= 400 except Exception as e: - print( - "Exception in requests.get():\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception in requests.get(): " + str(e)) raise WorkerException("Get request to {} failed".format(remote), e=e) return result @@ -229,12 +217,7 @@ def requests_post(remote, *args, **kw): try: result = requests.post(remote, *args, **kw) except Exception as e: - print( - "Exception in requests.post():\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception in requests.post(): " + str(e)) raise WorkerException("Post request to {} failed".format(remote), e=e) return result @@ -261,15 +244,10 @@ def send_api_post_request(api_url, payload, quiet=False): api_url ) ) - print( - "Exception in send_api_post_request():\n", - message, - sep="", - file=sys.stderr, - ) + logger.error("Exception in send_api_post_request(): " + message) raise WorkerException(message) if "error" in response: - print("Error from remote: {}".format(response["error"])) + logger.error("Error from remote: {}".format(response["error"])) t1 = datetime.now(timezone.utc) w = 1000 * (t1 - t0).total_seconds() @@ -283,8 +261,8 @@ def send_api_post_request(api_url, payload, quiet=False): ) if not quiet: if "info" in response: - print("Info from remote: {}".format(response["info"])) - print( + logger.info("Info from remote: {}".format(response["info"])) + logger.info( "Post request {} handled in {:.2f}ms (server: {:.2f}ms)".format( api_url, w, s ) @@ -301,7 +279,7 @@ def github_api(repo): def required_nets(engine): nets = {} pattern = re.compile(r"(EvalFile\w*)\s+.*\s+(nn-[a-f0-9]{12}.nnue)") - print("Obtaining EvalFile of {}...".format(engine.name)) + logger.info("Obtaining EvalFile of {}...".format(engine.name)) try: with subprocess.Popen( [engine, "uci"], @@ -358,17 +336,17 @@ def fetch_validated_net(remote, testing_dir, net, global_cache): if content is None: url = f"{remote}/api/nn/{net}" - print(f"Downloading {net}") + logger.info(f"Downloading {net}") content = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT).content if not is_valid_net(content, net): return False cache_write(global_cache, net, content) else: if not is_valid_net(content, net): - print(f"Removing invalid {net} from global cache") + logger.warning(f"Removing invalid {net} from global cache") cache_remove(global_cache, net) return False - print(f"Using {net} from global cache") + logger.info(f"Using {net} from global cache") (testing_dir / net).write_bytes(content) return True @@ -403,9 +381,9 @@ def establish_validated_net(remote, testing_dir, net, global_cache): if attempt > 5: raise waitTime = UPDATE_RETRY_TIME * attempt - print( - f"Failed to fetch {net} in attempt {attempt},", - f"trying in {waitTime} seconds", + logger.warning( + f"Failed to fetch {net} in attempt {attempt}," + + f"trying in {waitTime} seconds" ) time.sleep(waitTime) @@ -465,12 +443,12 @@ def run_parallel_benches(engine, concurrency, threads, hash_size, depth): def get_bench_nps(engine, games_concurrency, threads, hash_size): _depth, depth = 11, 13 - print("Warmup for bench...") + logger.info("Warmup for bench...") results = run_parallel_benches( engine, games_concurrency, threads, hash_size, _depth ) - print(f"...done in {results[0][0]:.2f}ms") - print("Running bench...") + logger.info(f"...done in {results[0][0]:.2f}ms") + logger.info("Running bench...") results = run_parallel_benches(engine, games_concurrency, threads, hash_size, depth) bench_nodes_values = [bn for _, bn in results] @@ -484,7 +462,7 @@ def get_bench_nps(engine, games_concurrency, threads, hash_size): max_nps = max(bench_nps_values) stdev_nps = statistics.stdev(bench_nps_values) if len(bench_nps_values) > 1 else 0 - print( + logger.info( f"Statistics for {engine.name}:\n" f"{'Concurrency':<15}: {games_concurrency:15.2f}\n" f"{'Threads':<15}: {threads:15.2f}\n" @@ -502,9 +480,9 @@ def get_bench_nps(engine, games_concurrency, threads, hash_size): def verify_signature(engine, signature): hash_size, threads, depth = 16, 1, 13 - print("Computing engine signature...") + logger.info("Computing engine signature...") bench_time, bench_nodes = run_single_bench(engine, hash_size, threads, depth) - print(f"...done in {bench_time:.2f}ms") + logger.info(f"...done in {bench_time:.2f}ms") if int(bench_nodes) != int(signature): message = ( f"Wrong bench in {engine.name}, " @@ -538,7 +516,7 @@ def download_from_github_raw( item, owner="official-stockfish", repo="books", branch="master" ): item_url = "{}/{}/{}/{}/{}".format(RAWCONTENT_HOST, owner, repo, branch, item) - print("Downloading {}".format(item_url)) + logger.info("Downloading {}".format(item_url)) return requests_get(item_url, timeout=HTTP_TIMEOUT).content @@ -548,7 +526,7 @@ def download_from_github_api( item_url = "{}/repos/{}/{}/contents/{}?ref={}".format( API_HOST, owner, repo, item, branch ) - print("Downloading {}".format(item_url)) + logger.info("Downloading {}".format(item_url)) git_url = requests_get(item_url, timeout=HTTP_TIMEOUT).json()["git_url"] return b64decode(requests_get(git_url, timeout=HTTP_TIMEOUT).json()["content"]) @@ -561,7 +539,7 @@ def download_from_github( except FatalException: raise except Exception as e: - print(f"Downloading {item} failed: {str(e)}. Trying the GitHub api.") + logger.warning(f"Downloading {item} failed: {str(e)}. Trying the GitHub api.") try: blob = download_from_github_api(item, owner=owner, repo=repo, branch=branch) except Exception as e: @@ -669,7 +647,9 @@ def make_targets(): break except (OSError, subprocess.SubprocessError) as e: - print("Exception while executing make help:\n", e, sep="", file=sys.stderr) + logger.error( + "Exception while executing make help:\n", e, sep="", file=sys.stderr + ) raise FatalException("It appears 'make' is not properly installed") if p.returncode != 0: @@ -688,7 +668,7 @@ def find_arch(compiler): # recent SF support a native target if "native" in targets: - print("Using native target architecture") + logger.info("Using native target architecture") return "native" # older SF will need to fall back to this implementation @@ -758,9 +738,9 @@ def find_arch(compiler): else: arch = "x86-32" - print("Available Makefile architecture targets: ", targets) - print("Available g++/cpu properties: ", props) - print("Determined the best architecture to be ", arch) + logger.info("Available Makefile architecture targets: ", targets) + logger.info("Available g++/cpu properties: ", props) + logger.info("Determined the best architecture to be ", arch) return arch @@ -808,12 +788,12 @@ def setup_engine( if blob is None: item_url = github_api(repo_url) + "/zipball/" + sha - print("Downloading {}".format(item_url)) + logger.info("Downloading {}".format(item_url)) blob = requests_get(item_url).content blob_needs_write = True else: blob_needs_write = False - print("Using {} from global cache".format(sha + ".zip")) + logger.info("Using {} from global cache".format(sha + ".zip")) file_list = unzip(blob, tmp_dir) # once unzipped without error we can write as needed @@ -826,7 +806,7 @@ def setup_engine( os.chdir(build_dir) for net in required_nets_from_source(): - print("Build uses default net:", net) + logger.info("Build uses default net: " + net) establish_validated_net(remote, testing_dir, net, global_cache) shutil.copyfile(testing_dir / net, net) @@ -904,7 +884,7 @@ def setup_engine( def kill_process(p): p_name = os.path.basename(p.args[0]) - print("Killing {} with PID {}... ".format(p_name, p.pid), end="", flush=True) + logger.info("Killing {} with PID {}... ".format(p_name, p.pid)) try: if IS_WINDOWS: # p.kill() doesn't kill subprocesses on Windows. @@ -916,16 +896,14 @@ def kill_process(p): else: p.kill() except Exception as e: - print( - "\nException killing {} with PID {}, possibly already terminated:\n".format( + logger.error( + "Exception killing {} with PID {}, possibly already terminated: ".format( p_name, p.pid - ), - e, - sep="", - file=sys.stderr, + ) + + str(e) ) else: - print("killed", flush=True) + logger.info("killed") def adjust_tc(tc, factor): @@ -957,7 +935,7 @@ def adjust_tc(tc, factor): scaled_tc = "{}/{}".format(num_moves, scaled_tc) tc_limit *= 100.0 / num_moves - print("CPU factor : {} - tc adjusted to {}".format(factor, scaled_tc)) + logger.info("CPU factor : {} - tc adjusted to {}".format(factor, scaled_tc)) return scaled_tc, tc_limit @@ -1022,13 +1000,13 @@ def shorten_hash(match): t_error.start() end_time = datetime.now(timezone.utc) + timedelta(seconds=tc_limit) - print("TC limit {} End time: {}".format(tc_limit, end_time)) + logger.info("TC limit {} End time: {}".format(tc_limit, end_time)) num_games_updated = 0 while datetime.now(timezone.utc) < end_time: if current_state["task_id"] is None: # This task is no longer necessary - print(finished_task_message) + logger.info(finished_task_message) return False try: line = q.get_nowait().strip() @@ -1039,7 +1017,7 @@ def shorten_hash(match): continue line = hash_pattern.sub(shorten_hash, line) - print(line, flush=True) + logger.info(line) # Do we have a pgn crc? if "has CRC32:" in line: @@ -1048,7 +1026,7 @@ def shorten_hash(match): # Have we reached the end of the match? Then just exit. if "Finished match" in line: if num_games_updated == games_to_play: - print("Finished match cleanly") + logger.info("Finished match cleanly") else: raise WorkerException( "Finished match uncleanly {} vs. required {}".format( @@ -1148,18 +1126,13 @@ def shorten_hash(match): if "error" in response: break except Exception as e: - print( - "Exception calling update_task:\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception calling update_task: " + str(e)) if isinstance(e, FatalException): # signal raise e else: if not response["task_alive"]: # This task is no longer necessary - print(finished_task_message) + logger.info(finished_task_message) return False update_succeeded = True num_games_updated = num_games_finished @@ -1200,7 +1173,7 @@ def launch_fastchess( if not req["task_alive"]: # This task is no longer necessary - print( + logger.info( "The server told us that no more games are needed for the current task." ) return False @@ -1283,23 +1256,18 @@ def launch_fastchess( try: send_sigint(p) except Exception as e: - print("\nException in send_sigint:\n", e, sep="", file=sys.stderr) + logger.error("Exception in send_sigint: " + str(e)) # now wait... - print("\nWaiting for fastchess to finish... ", end="", flush=True) + logger.info("Waiting for fastchess to finish... ") try: p.wait(timeout=FASTCHESS_KILL_TIMEOUT) except subprocess.TimeoutExpired: - print("timeout", flush=True) + logger.warning("timeout") kill_process(p) else: - print("done", flush=True) + logger.info("done") except (OSError, subprocess.SubprocessError) as e: - print( - "Exception starting fastchess:\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception starting fastchess: " + str(e)) raise WorkerException("Unable to start fastchess. Error: {}".format(str(e))) return task_alive @@ -1383,7 +1351,9 @@ def run_games( opening_offset = task.get("start", task_id * task["num_games"]) if "start" in task: - print("Variable task sizes used. Opening offset = {}".format(opening_offset)) + logger.info( + "Variable task sizes used. Opening offset = {}".format(opening_offset) + ) start_game_index = opening_offset + input_total_games run_seed = int(hashlib.sha1(run["_id"].encode("utf-8")).hexdigest(), 16) % (2**64) @@ -1408,22 +1378,15 @@ def format_fastchess_options(options): reverse=True, ) except Exception as e: - print( - "Failed to obtain access time of old engine binary:\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Failed to obtain access time of old engine binary: " + str(e)) else: for old_engine in engines[num_bkps:]: try: old_engine.unlink() except Exception as e: - print( - "Failed to remove an old engine binary {}:\n".format(old_engine), - e, - sep="", - file=sys.stderr, + logger.error( + "Failed to remove an old engine binary {}: ".format(old_engine) + + str(e) ) # Build from sources new and base engines as needed. @@ -1470,11 +1433,8 @@ def format_fastchess_options(options): try: old_net.unlink() except Exception as e: - print( - "Failed to remove an old network {}:\n".format(old_net), - e, - sep="", - file=sys.stderr, + logger.errpr( + "Failed to remove an old network {}: ".format(old_net) + str(e) ) # Add EvalFile* with full path to fastchess options, and download the networks if missing. diff --git a/worker/sri.txt b/worker/sri.txt index 0288f8d42..c5106a39d 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 276, "updater.py": "+i4UI6vDlgNrZ/A/mCOGN820HJX2L896A75KKbGT10DeOXJQwwsleqEzNJpdysou", "worker.py": "nr3Dz7DJ0b69FVQ3QQ3iL/tNbbOklUH6uS50WGW75HwBzgxPzJLxXBKJrEob+4i/", "games.py": "IQMDSd55VldF72RZRCTMTZF8yFFM6exjifkw2y2Cy85bwfAk3Z90/CvAnUFUmOJH"} +{"__version": 276, "updater.py": "+i4UI6vDlgNrZ/A/mCOGN820HJX2L896A75KKbGT10DeOXJQwwsleqEzNJpdysou", "worker.py": "kjcHe3fqUeK9xaSI1JPk5mZwHLs5fpcrbvQcARLL7S0ptJyumPeu2AIIDh4LYmQy", "games.py": "Kq2M5SzmrkehQPRrgRuwGwIHGV9s6nGporvFWx9sVzjl4+Lk1PMThyE8yUXV3hhS"} diff --git a/worker/worker.py b/worker/worker.py index 6dcd733df..478de1f6c 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -5,6 +5,7 @@ import hashlib import io import json +import logging import multiprocessing import os import platform @@ -26,6 +27,9 @@ from functools import partial from pathlib import Path +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("worker") + try: from expression import Expression_Parser except ImportError: @@ -159,7 +163,7 @@ def _alpha_numeric(x): if x == "_hw": return x if len(x) <= 1: - print("The prefix {} is too short".format(x)) + logger.error("The prefix {} is too short".format(x)) raise ValueError(x) if not all(ord(c) < 128 for c in x) or not x.isalnum(): raise ValueError(x) @@ -181,7 +185,7 @@ def __call__(self, x): try: ret = round(max(min(e.parse(x), self.MAX), 0)) except Exception: - print("Unable to parse expression for max_memory") + logger.error("Unable to parse expression for max_memory") raise ValueError(x) return x, ret @@ -199,15 +203,15 @@ def __call__(self, x): try: ret = round(e.parse(x)) except Exception: - print("Unable to parse expression for concurrency") + logger.error("Unable to parse expression for concurrency") raise ValueError(x) if ret <= 0: - print("concurrency must be at least 1") + logger.error("concurrency must be at least 1") raise ValueError(x) if ("MAX" not in x and ret >= self.MAX) or ret > self.MAX: - print( + logger.error( ( "\nYou cannot have concurrency {} but at most:\n" " a) {} with '--concurrency MAX';\n" @@ -228,7 +232,7 @@ def safe_sleep(f): try: time.sleep(f) except Exception: - print("\nSleep interrupted...") + logger.info("\nSleep interrupted...") def text_hash(file): @@ -247,12 +251,7 @@ def generate_sri(install_dir): try: sri[file] = text_hash(item) except Exception as e: - print( - "Exception computing sri hash of {}:\n".format(item), - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception computing sri hash of {}: ".format(item) + str(e)) return None return sri @@ -260,7 +259,7 @@ def generate_sri(install_dir): def write_sri(install_dir): sri = generate_sri(install_dir) sri_file = install_dir / "sri.txt" - print("Writing sri hashes to {}".format(sri_file)) + logger.info("Writing sri hashes to {}".format(sri_file)) with open(sri_file, "w") as f: json.dump(sri, f) f.write("\n") @@ -278,10 +277,10 @@ def verify_sri(install_dir): with open(sri_file, "r") as f: sri_ = json.load(f) except Exception as e: - print("Exception reading {}:\n".format(sri_file), e, sep="", file=sys.stderr) + logger.error("Exception reading {}: ".format(sri_file) + str(e)) return False if not isinstance(sri_, dict): - print("The file {} does not contain a dictionary".format(sri_file)) + logger.error("The file {} does not contain a dictionary".format(sri_file)) return False for k, v in sri.items(): # When we update, the running worker is not the same as the one we are verifying. @@ -290,9 +289,9 @@ def verify_sri(install_dir): if k == "__version": continue if k not in sri_ or v != sri_[k]: - print("The value for {} is incorrect in {}".format(k, sri_file)) + logger.error("The value for {} is incorrect in {}".format(k, sri_file)) return False - print("The file {} matches the worker files!".format(sri_file)) + logger.info("The file {} matches the worker files!".format(sri_file)) return True @@ -313,22 +312,24 @@ def verify_remote_sri(install_dir): # False : verification failed # None : network error: unable to verify sri = generate_sri(install_dir) + if sri is None: + return None sri_ = download_sri() if sri_ is None: return None version = sri_.get("__version", -1) if version != WORKER_VERSION: - print("The master sri file has a different version number. Ignoring!") + logger.info("The master sri file has a different version number. Ignoring!") return True tainted = False for k, v in sri_.items(): if k not in sri or v != sri[k]: - print("{} has been modified!".format(k)) + logger.warning("{} has been modified!".format(k)) tainted = True if tainted: - print("This worker is tainted...") + logger.warning("This worker is tainted...") else: - print("Running an unmodified worker...") + logger.info("Running an unmodified worker...") return not tainted @@ -341,7 +342,7 @@ def verify_credentials(remote, username, password, cached): # username/password req = {} if username != "" and password != "": - print( + logger.info( "Confirming {} credentials with {}".format( "cached" if cached else "supplied", remote ) @@ -355,14 +356,14 @@ def verify_credentials(remote, username, password, cached): return None # network problem (unrecoverable) if "error" in req: return False # invalid username/password - print("Credentials ok!") + logger.info("Credentials ok!") return True return False # empty username or password def get_credentials(config, options, args): remote = "{}://{}:{}".format(options.protocol, options.host, options.port) - print("Worker version {} connecting to {}".format(WORKER_VERSION, remote)) + logger.info("Worker version {} connecting to {}".format(WORKER_VERSION, remote)) username = config.get("login", "username") password = config.get("login", "password", raw=True) @@ -396,7 +397,7 @@ def get_credentials(config, options, args): def verify_fastchess(fastchess_path, fastchess_sha): # Verify that fastchess is working and has the required minimum version. - print(f"Obtaining version info for {fastchess_path}...") + logger.info(f"Obtaining version info for {fastchess_path}...") try: with subprocess.Popen( [fastchess_path, "--version"], @@ -414,26 +415,26 @@ def verify_fastchess(fastchess_path, fastchess_sha): for line in iter(p.stdout.readline, ""): m = pattern.search(line) if m: - print("Found", line.strip()) + logger.info("Found " + line.strip()) short_sha = m.group(1) except (OSError, subprocess.SubprocessError) as e: - print(f"Running fastchess raised {type(e).__name__}: {e}") + logger.error(f"Running fastchess raised {type(e).__name__}: {e}") return False if p.returncode != 0: - print( + logger.error( f"Unable to run fastchess. Return code: {format_returncode(p.returncode)}. Error: {errors}" ) return False if len(short_sha) < 7: - print( + logger.error( "Unable to find a suitable sha of length 7 or more in the fastchess version" ) return False if not fastchess_sha.startswith(short_sha): - print( + logger.error( f"fastchess sha {fastchess_sha} required but the version shows {short_sha}" ) return False @@ -450,11 +451,11 @@ def setup_fastchess(worker_dir, compiler, concurrency, global_cache, tests=False try: fastchess_path.unlink() except Exception as e: - print(f"Removing fastchess raised {type(e).__name__}: {e}") + logger.error(f"Removing fastchess raised {type(e).__name__}: {e}") return False tmp_dir = Path(tempfile.mkdtemp(dir=worker_dir)) try: - print("Building fastchess from sources...") + logger.info("Building fastchess from sources...") should_cache = False blob = cache_read(global_cache, FASTCHESS_SHA + ".zip") @@ -465,11 +466,11 @@ def setup_fastchess(worker_dir, compiler, concurrency, global_cache, tests=False + "/fastchess/zipball/" + FASTCHESS_SHA ) - print("Downloading {}".format(item_url)) + logger.info("Downloading {}".format(item_url)) blob = requests_get(item_url).content should_cache = True else: - print("Using {} from global cache".format(FASTCHESS_SHA + ".zip")) + logger.info("Using {} from global cache".format(FASTCHESS_SHA + ".zip")) file_list = unzip(blob, tmp_dir) build_dir = tmp_dir / os.path.commonprefix([n.filename for n in file_list]) @@ -490,7 +491,7 @@ def setup_fastchess(worker_dir, compiler, concurrency, global_cache, tests=False ] for cmd in cmds: - print(cmd) + logger.info(cmd) with subprocess.Popen( cmd, shell=True, @@ -520,11 +521,8 @@ def setup_fastchess(worker_dir, compiler, concurrency, global_cache, tests=False (build_dir / "fastchess").with_suffix(EXE_SUFFIX).replace(fastchess_path) except Exception as e: - print( - "Exception downloading, extracting or building fastchess:\n", - e, - sep="", - file=sys.stderr, + logger.error( + "Exception downloading, extracting or building fastchess: " + str(e) ) return False else: @@ -549,7 +547,7 @@ def validate(config, schema): if callable(v[4]): # prepocessor o1 = v[4](o) if o1 != o: - print( + logger.warning( "Replacing the value '{}' of config option '{}' by '{}'".format( o, v[1], o1 ) @@ -562,7 +560,7 @@ def validate(config, schema): _ = t(o) except Exception: # v[2] is the default - print( + logger.warning( "The value '{}' of config option '{}' is not of type '{}'.\n" "Replacing it by the default value '{}'".format( o, v[1], v[3].__name__, v[2] @@ -570,7 +568,7 @@ def validate(config, schema): ) config.set(*v[:3]) elif o not in t: # choices - print( + logger.warning( "The value '{}' of config option '{}' is not in {}.\n" "Replacing it by the default value '{}'".format(o, v[1], v[3], v[2]) ) @@ -581,12 +579,12 @@ def validate(config, schema): schema_options = [v[:2] for v in schema] for section in config.sections(): if section not in schema_sections: - print("Removing unknown config section '{}'".format(section)) + logger.warning("Removing unknown config section '{}'".format(section)) config.remove_section(section) continue for option in config.options(section): if (section, option) not in schema_options: - print("Removing unknown config option '{}'".format(option)) + logger.warning("Removing unknown config option '{}'".format(option)) config.remove_option(section, option) @@ -597,13 +595,8 @@ def setup_parameters(worker_dir): try: config.read(config_file) except Exception as e: - print( - "Exception reading configfile {}:\n".format(config_file), - e, - sep="", - file=sys.stderr, - ) - print("Initializing configfile") + logger.error("Exception reading configfile {}: ".format(config_file) + str(e)) + logger.info("Initializing configfile") config = ConfigParser(inline_comment_prefixes=";", interpolation=None) # Step 2: probe the host system. @@ -623,12 +616,12 @@ def setup_parameters(worker_dir): cmd = "sysctl hw.memsize" else: cmd = "" - print("Unknown system") + logger.error("Unknown system") with os.popen(cmd) as proc: mem_str = str(proc.readlines()) mem = int(re.search(r"\d+", mem_str).group()) except Exception as e: - print("Exception checking HW info:\n", e, sep="", file=sys.stderr) + logger.error("Exception checking HW info: " + str(e)) return None # Step 2b: determine the number of cores. @@ -636,13 +629,13 @@ def setup_parameters(worker_dir): try: max_cpu_count = multiprocessing.cpu_count() except Exception as e: - print("Exception checking the CPU cores count:\n", e, sep="", file=sys.stderr) + logger.error("Exception checking the CPU cores count: " + str(e)) return None # Step 2c: detect the available compilers. compilers = detect_compilers() if compilers == {}: - print("No usable compilers found") + logger.error("No usable compilers found") return None # Step 3: validate config options and replace missing or invalid @@ -808,11 +801,11 @@ def my_error(e): try: (options, args) = parser.parse_known_args() except Exception as e: - print(str(e)) + logger.error(str(e)) return None if len(args) not in [0, 2]: - print("Unparsed command line arguments: {}".format(" ".join(args))) + logger.error("Unparsed command line arguments: {}".format(" ".join(args))) parser.print_usage() return None @@ -823,11 +816,11 @@ def my_error(e): if options.protocol == "http" and options.port == 443: # Rewrite old port 443 to 80 - print("Changing port to 80") + logger.info("Changing port to 80") options.port = 80 elif options.protocol == "https" and options.port == 80: # Rewrite old port 80 to 443 - print("Changing port to 443") + logger.info("Changing port to 443") options.port = 443 # Limit concurrency so that at least STC tests can run with the evailable memory @@ -838,7 +831,7 @@ def my_error(e): fc_memory = 60 max_concurrency = int((options.max_memory - fc_memory) / STC_memory) if max_concurrency < 1: - print( + logger.error( "You need to reserve at least {} MiB to run the worker!".format( STC_memory + fc_memory ) @@ -846,30 +839,30 @@ def my_error(e): return None options.concurrency_reduced = False if max_concurrency < options.concurrency: - print( + logger.warning( "Changing concurrency to allow for running STC tests with the available memory" ) - print( + logger.info( "The required memory to run with {} concurrency is {} MiB".format( options.concurrency, STC_memory * options.concurrency + fc_memory ) ) - print("The concurrency has been reduced to {}".format(max_concurrency)) - print("Consider increasing max_memory if possible") + logger.info("The concurrency has been reduced to {}".format(max_concurrency)) + logger.info("Consider increasing max_memory if possible") options.concurrency = max_concurrency options.concurrency_reduced = True options.compiler = compilers[options.compiler_] options.hw_id = hw_id(config.getint("private", "hw_seed")) - print("Default uuid_prefix: {}".format(options.hw_id)) + logger.info("Default uuid_prefix: {}".format(options.hw_id)) # Step 6: determine credentials. username, password = get_credentials(config, options, args) if username == "": - print("Invalid or missing credentials") + logger.error("Invalid or missing credentials") return None options.username = username @@ -909,15 +902,15 @@ def my_error(e): # Step 8: give some feedback to the user. - print( + logger.info( "System memory determined to be: {:.3f}GiB".format(mem / (1024 * 1024 * 1024)) ) - print( + logger.info( "Worker constraints: {{'concurrency': {}, 'max_memory': {}, 'min_threads': {}}}".format( options.concurrency, options.max_memory, options.min_threads ) ) - print("Config file {} written".format(config_file)) + logger.info("Config file {} written".format(config_file)) return options @@ -961,7 +954,7 @@ def get_machine_id(): name = "MachineGuid" machine_id = read_winreg(path, name) if machine_id is not None: - print( + logger.info( "machine_id {} obtained from HKEY_LOCAL_MACHINE\\{}\\{}".format( machine_id, path, name ) @@ -988,12 +981,7 @@ def get_machine_id(): break machine_uuid_str += line except Exception as e: - print( - "Exception while reading the machine_id:\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception while reading the machine_id: " + str(e)) if machine_uuid_str != "": match_obj = re.compile( "[A-Z,0-9]{8,8}-" @@ -1005,7 +993,7 @@ def get_machine_id(): result = match_obj.findall(machine_uuid_str) if len(result) > 0: machine_id = result[0] - print("machine_id {} obtained via '{}'".format(machine_id, cmd)) + logger.info("machine_id {} obtained via '{}'".format(machine_id, cmd)) return machine_id elif os.name == "posix": host_ids = ["/etc/machine-id", "/var/lib/dbus/machine-id"] @@ -1013,11 +1001,13 @@ def get_machine_id(): try: with open(file) as f: machine_id = f.read().strip() - print("machine_id {} obtained from {}".format(machine_id, file)) + logger.info( + "machine_id {} obtained from {}".format(machine_id, file) + ) return machine_id except Exception: pass - print("Unable to obtain the machine id") + logger.warning("Unable to obtain the machine id") return "" @@ -1042,12 +1032,7 @@ def get_remaining_github_api_calls(): rate.raise_for_status() return rate.json()["resources"]["core"]["remaining"] except Exception as e: - print( - "Exception fetching rate_limit (invalid ~/.netrc?):\n", - e, - sep="", - file=sys.stderr, - ) + logger.error("Exception fetching rate_limit (invalid ~/.netrc?): " + str(e)) return 0 @@ -1064,7 +1049,7 @@ def gcc_version(): ) as p: for line in iter(p.stdout.readline, ""): if "__clang_major__" in line: - print("clang++ poses as g++") + logger.warning("clang++ poses as g++") return None if "__GNUC__" in line: major = line.split()[2] @@ -1073,10 +1058,12 @@ def gcc_version(): if "__GNUC_PATCHLEVEL__" in line: patchlevel = line.split()[2] except (OSError, subprocess.SubprocessError): - print("No g++ or g++ is not executable") + logger.info("No g++ or g++ is not executable") return None if p.returncode != 0: - print("g++ version query failed with return code {}".format(p.returncode)) + logger.warning( + "g++ version query failed with return code {}".format(p.returncode) + ) return None try: @@ -1085,17 +1072,17 @@ def gcc_version(): patchlevel = int(patchlevel) compiler = "g++" except Exception: - print("Failed to parse g++ version.") + logger.warning("Failed to parse g++ version.") return None if (major, minor) < (MIN_GCC_MAJOR, MIN_GCC_MINOR): - print( + logger.warning( "Found g++ version {}.{}.{}. First usable version is {}.{}.0".format( major, minor, patchlevel, MIN_GCC_MAJOR, MIN_GCC_MINOR ) ) return None - print("Found {} version {}.{}.{}".format(compiler, major, minor, patchlevel)) + logger.info("Found {} version {}.{}.{}".format(compiler, major, minor, patchlevel)) return (compiler, major, minor, patchlevel) @@ -1118,10 +1105,12 @@ def clang_version(): if "__clang_patchlevel__" in line: clang_patchlevel = line.split()[2] except (OSError, subprocess.SubprocessError): - print("No clang++ or clang++ is not executable") + logger.info("No clang++ or clang++ is not executable") return None if p.returncode != 0: - print("clang++ version query failed with return code {}".format(p.returncode)) + logger.warning( + "clang++ version query failed with return code {}".format(p.returncode) + ) return None try: major = int(clang_major) @@ -1129,11 +1118,11 @@ def clang_version(): patchlevel = int(clang_patchlevel) compiler = "clang++" except Exception: - print("Failed to parse clang++ version.") + logger.warning("Failed to parse clang++ version.") return None if (major, minor) < (MIN_CLANG_MAJOR, MIN_CLANG_MINOR): - print( + logger.warning( "Found clang++ version {}.{}.{}. First usable version is {}.{}.0".format( major, minor, patchlevel, MIN_CLANG_MAJOR, MIN_CLANG_MINOR ) @@ -1148,12 +1137,12 @@ def clang_version(): stderr=subprocess.DEVNULL, ) except (OSError, subprocess.SubprocessError): - print( + logger.warning( "clang++ is present but misconfigured: the command 'llvm-profdata' is missing" ) return None - print("Found {} version {}.{}.{}".format(compiler, major, minor, patchlevel)) + logger.info("Found {} version {}.{}.{}".format(compiler, major, minor, patchlevel)) return (compiler, major, minor, patchlevel) @@ -1169,7 +1158,7 @@ def detect_compilers(): def verify_toolchain(): - print("Toolchain check...") + logger.info("Toolchain check...") cmds = { "strip": ["strip", "-V"], "make": ["make", "-v"], @@ -1184,16 +1173,16 @@ def verify_toolchain(): try: p = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) if p.returncode != 0: - print( + logger.error( f"'{cmd_str}' returned code: {p.returncode} and error: {p.stderr.decode().strip()}" ) missing_tools.append(name) except (OSError, subprocess.SubprocessError) as e: - print(f"'{cmd_str}' raised: {type(e).__name__}: {e}") + logger.error(f"'{cmd_str}' raised: {type(e).__name__}: {e}") missing_tools.append(name) if missing_tools: - print(f"Missing required tools: {', '.join(missing_tools)}") + logger.error(f"Missing required tools: {', '.join(missing_tools)}") return False return True @@ -1220,7 +1209,7 @@ def get_exception(files): def heartbeat(worker_info, password, remote, current_state): - print("Start heartbeat") + logger.info("Start heartbeat") payload = { "password": password, "worker_info": worker_info, @@ -1229,28 +1218,31 @@ def heartbeat(worker_info, password, remote, current_state): time.sleep(1) now = datetime.now(timezone.utc) if current_state["last_updated"] + timedelta(seconds=120) < now: - print(" Send heartbeat for", worker_info["unique_key"], end="... ") current_state["last_updated"] = now run = current_state["run"] payload["run_id"] = str(run["_id"]) if run else None task_id = current_state["task_id"] payload["task_id"] = task_id if payload["run_id"] is None or payload["task_id"] is None: - print("Skipping heartbeat...") + logger.warning("Skipping heartbeat for " + worker_info["unique_key"]) continue try: req = send_api_post_request(remote + "/api/beat", payload, quiet=True) except Exception as e: - print("Exception calling heartbeat:\n", e, sep="", file=sys.stderr) + logger.error("Exception calling heartbeat: " + str(e)) else: if "error" not in req: - print("(received)") + logger.info( + "Heartbeat for " + + worker_info["unique_key"] + + " received by server!" + ) task_alive = req.get("task_alive", True) if not task_alive: current_state["task_id"] = None current_state["run"] = None else: - print("Heartbeat stopped") + logger.info("Heartbeat stopped") def utcoffset(): @@ -1268,7 +1260,7 @@ def verify_worker_version(remote, username, password, worker_lock): # None: network error: unable to verify # We don't return if the server informs us that a newer version of the worker # is available - print("Verify worker version...") + logger.info("Verify worker version...") payload = {"worker_info": {"username": username}, "password": password} try: req = send_api_post_request(remote + "/api/request_version", payload) @@ -1277,19 +1269,19 @@ def verify_worker_version(remote, username, password, worker_lock): if "error" in req: return False # likewise if req["version"] > WORKER_VERSION: - print("Updating worker version to {}".format(req["version"])) + logger.info("Updating worker version to {}".format(req["version"])) backup_log() try: worker_lock.release() update() except Exception as e: - print( - "Exception while updating to version {}:\n".format(req["version"]), - e, - sep="", - file=sys.stderr, + logger.error( + "Exception while updating to version {}: ".format(req["version"]) + + str(e) ) - print("Attempted update to worker version {} failed!".format(req["version"])) + logger.error( + "Attempted update to worker version {} failed!".format(req["version"]) + ) return False return True @@ -1308,7 +1300,7 @@ def fetch_and_handle_task( # current_state["alive"] to False. # Print the current time for log purposes - print( + logger.info( "Current time is {} UTC (local offset: {}) ".format( datetime.now(timezone.utc), utcoffset() ) @@ -1323,10 +1315,10 @@ def fetch_and_handle_task( # Verify if we still have enough GitHub api calls remaining = get_remaining_github_api_calls() - print("Remaining number of GitHub api calls = {}".format(remaining)) + logger.info("Remaining number of GitHub api calls = {}".format(remaining)) near_github_api_limit = remaining <= 10 if near_github_api_limit: - print( + logger.warning( """ We have almost exhausted our GitHub api calls. The server will only give us tasks for tests we have seen before. @@ -1335,7 +1327,7 @@ def fetch_and_handle_task( worker_info["near_github_api_limit"] = near_github_api_limit # Let's go! - print("Fetching task...") + logger.info("Fetching task...") payload = {"worker_info": worker_info, "password": password} try: req = send_api_post_request(remote + "/api/request_task", payload) @@ -1347,14 +1339,14 @@ def fetch_and_handle_task( # No tasks ready for us yet, just wait... if "task_waiting" in req: - print("No tasks available at this time, waiting...") + logger.info("No tasks available at this time, waiting...") return False run, task_id = req["run"], req["task_id"] current_state["run"] = run current_state["task_id"] = task_id - print( + logger.info( "Working on task {} from {}/tests/view/{}".format(task_id, remote, run["_id"]) ) if "sprt" in run["args"]: @@ -1375,7 +1367,9 @@ def fetch_and_handle_task( run["args"]["num_games"], ) ) - print("Running {} vs {}".format(run["args"]["new_tag"], run["args"]["base_tag"])) + logger.info( + "Running {} vs {}".format(run["args"]["new_tag"], run["args"]["base_tag"]) + ) success = False message = "" @@ -1423,12 +1417,12 @@ def fetch_and_handle_task( } if not success: - print("\nException running games:\n", message, sep="", file=sys.stderr) - print("Informing the server") + logger.info("Exception running games: " + message) + logger.info("Informing the server") try: req = send_api_post_request(api, payload) except Exception as e: - print("Exception posting failed_task:\n", e, sep="", file=sys.stderr) + logger.error("Exception posting failed_task: " + str(e)) def upload_pgn_data(pgn_data, run_id, task_id, remote, payload): with io.BytesIO() as gz_buffer: @@ -1440,7 +1434,7 @@ def upload_pgn_data(pgn_data, run_id, task_id, remote, payload): gz.write(pgn_data.encode()) payload["pgn"] = base64.b64encode(gz_buffer.getvalue()).decode() - print("Uploading compressed PGN of {} bytes".format(len(payload["pgn"]))) + logger.info("Uploading compressed PGN of {} bytes".format(len(payload["pgn"]))) send_api_post_request(remote + "/api/upload_pgn", payload) if ( @@ -1448,7 +1442,7 @@ def upload_pgn_data(pgn_data, run_id, task_id, remote, payload): or not pgn_file["name"].exists() or pgn_file["name"].stat().st_size == 0 ): - print("Task exited") + logger.info("Task exited") return success crc_expected = pgn_file["CRC"] @@ -1463,7 +1457,7 @@ def upload_pgn_data(pgn_data, run_id, task_id, remote, payload): # Check that the file is not corrupted if crc_actual != crc_expected: - print( + logger.error( f"Checksum of file ({crc_actual}) does not match expected value ({crc_expected}).\nSkipping upload." ) else: @@ -1471,14 +1465,14 @@ def upload_pgn_data(pgn_data, run_id, task_id, remote, payload): data = file_content.decode("utf-8", errors="ignore") upload_pgn_data(data, run["_id"], task_id, remote, payload) except Exception as e: - print("\nException uploading PGN file:\n", e, sep="", file=sys.stderr) + logger.error("Exception uploading PGN file: " + str(e)) try: pgn_file.unlink() except Exception as e: - print("Exception deleting PGN file:\n", e, sep="", file=sys.stderr) + logger.error("Exception deleting PGN file:\n" + str(e)) - print("Task exited") + logger.info("Task exited") return success @@ -1489,17 +1483,17 @@ def worker(): worker_lock = openlock.FileLock(LOCK_FILE) worker_lock.acquire(timeout=0) except openlock.Timeout: - print( - f"\n*** Another worker (with PID={worker_lock.getpid()}) is already running in this " + logger.error( + f"*** Another worker (with PID={worker_lock.getpid()}) is already running in this " "directory ***" ) return 1 # Make sure that the worker can upgrade! except Exception as e: - print("\n *** Unexpected exception: {} ***\n".format(str(e))) + logger.error("*** Unexpected exception: {} *** ".format(str(e))) worker_dir = Path(__file__).resolve().parent - print("Worker started in {} with PID={}".format(worker_dir, os.getpid())) + logger.info("Worker started in {} with PID={}".format(worker_dir, os.getpid())) # Create the testing directory if missing. (worker_dir / "testing").mkdir(exist_ok=True) @@ -1536,7 +1530,7 @@ def worker(): options = setup_parameters(worker_dir) if options is None: - print("Error parsing options. Config file not written.") + logger.error("Error parsing options. Config file not written.") return 1 # Write sri hashes of the worker files @@ -1557,7 +1551,7 @@ def worker(): ): return 1 except Exception as e: - print("Exception verifying worker version:\n", e, sep="", file=sys.stderr) + logger.error("Exception verifying worker version: " + str(e)) return 1 # Assemble the config/options data as well as some other data in a @@ -1565,7 +1559,7 @@ def worker(): # This data will be sent to the server when a new task is requested. compiler, major, minor, patchlevel = options.compiler - print("Using {} {}.{}.{}".format(compiler, major, minor, patchlevel)) + logger.info("Using {} {}.{}.{}".format(compiler, major, minor, patchlevel)) # Check for common tool chain issues if not verify_toolchain(): @@ -1608,7 +1602,7 @@ def worker(): "nps": 0.0, } - print("UUID:", worker_info["unique_key"]) + logger.info("UUID: " + worker_info["unique_key"]) # Start heartbeat thread as a daemon (not strictly necessary, but there might be bugs) heartbeat_thread = threading.Thread( @@ -1639,7 +1633,7 @@ def worker(): ) if (worker_dir / "fish.exit").is_file(): current_state["alive"] = False - print("Stopped by 'fish.exit' file") + logger.info("Stopped by 'fish.exit' file") fish_exit = True break elif not current_state["alive"]: # the user may have pressed Ctrl-C... @@ -1647,23 +1641,25 @@ def worker(): elif not success: if options.fleet: current_state["alive"] = False - print("Exiting the worker since fleet==True and an error occurred") + logger.info( + "Exiting the worker since fleet==True and an error occurred" + ) break else: - print("Waiting {} seconds before retrying".format(delay)) + logger.info("Waiting {} seconds before retrying".format(delay)) safe_sleep(delay) delay = min(MAX_RETRY_TIME, delay * 2) else: delay = INITIAL_RETRY_TIME if fish_exit: - print("Removing fish.exit file") + logger.info("Removing fish.exit file") (worker_dir / "fish.exit").unlink() - print("Releasing the worker lock") + logger.info("Releasing the worker lock") worker_lock.release() - print("Waiting for the heartbeat thread to finish...") + logger.info("Waiting for the heartbeat thread to finish...") heartbeat_thread.join(THREAD_JOIN_TIMEOUT) return 0 if fish_exit else 1