Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor everserver 2 #9777

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class EverestExitCode(IntEnum):
MAX_FUNCTIONS_REACHED = 3
MAX_BATCH_NUM_REACHED = 4
USER_ABORT = 5
EXCEPTION = 6


class EverestRunModel(BaseRunModel):
Expand Down
15 changes: 13 additions & 2 deletions src/everest/bin/everest_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import threading
from functools import partial

from ert.run_models.everest_run_model import EverestRunModel
from everest.config import EverestConfig, ServerConfig
from everest.detached import (
ServerStatus,
everserver_status,
server_is_running,
start_experiment,
start_server,
wait_for_server,
)
Expand Down Expand Up @@ -114,7 +114,11 @@ async def run_everest(options):
except ValueError as exc:
raise SystemExit(f"Config validation error: {exc}") from exc

if EverestRunModel.create(options.config).check_if_runpath_exists():
if (
options.config.simulation_dir is not None
and os.path.exists(options.config.simulation_dir)
and any(os.listdir(options.config.simulation_dir))
):
warn_user_that_runpath_is_nonempty()

try:
Expand All @@ -128,6 +132,13 @@ async def run_everest(options):
print("Waiting for server ...")
wait_for_server(options.config.output_dir, timeout=600)
print("Everest server found!")

start_experiment(
server_context=ServerConfig.get_server_context(options.config.output_dir),
config=options.config,
)

# blocks until the run is finished
run_detached_monitor(
server_context=ServerConfig.get_server_context(options.config.output_dir),
optimization_output_dir=options.config.optimization_output_dir,
Expand Down
60 changes: 36 additions & 24 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
OPT_PROGRESS_ID,
SIM_PROGRESS_ENDPOINT,
SIM_PROGRESS_ID,
START_EXPERIMENT_ENDPOINT,
STOP_ENDPOINT,
)

Expand Down Expand Up @@ -62,7 +63,9 @@ async def start_server(config: EverestConfig, debug: bool = False) -> Driver:
return driver


def stop_server(server_context: tuple[str, str, tuple[str, str]], retries: int = 5):
def stop_server(
server_context: tuple[str, str, tuple[str, str]], retries: int = 5
) -> bool:
"""
Stop server if found and it is running.
"""
Expand All @@ -84,6 +87,30 @@ def stop_server(server_context: tuple[str, str, tuple[str, str]], retries: int =
return False


def start_experiment(
server_context: tuple[str, str, tuple[str, str]],
config: EverestConfig,
retries: int = 5,
) -> None:
for retry in range(retries):
try:
url, cert, auth = server_context
start_endpoint = "/".join([url, START_EXPERIMENT_ENDPOINT])
response = requests.post(
start_endpoint,
verify=cert,
auth=auth,
proxies=PROXY, # type: ignore
json=config.to_dict(),
)
response.raise_for_status()
return
except:
logging.debug(traceback.format_exc())
time.sleep(retry)
raise ValueError("Failed to start experiment")


def extract_errors_from_file(path: str):
with open(path, encoding="utf-8") as f:
content = f.read()
Expand All @@ -97,29 +124,13 @@ def wait_for_server(output_dir: str, timeout: int) -> None:

Raise an exception when the timeout is reached.
"""
everserver_status_path = ServerConfig.get_everserver_status_path(output_dir)
if not server_is_running(*ServerConfig.get_server_context(output_dir)):
sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1)
for retry_count in range(_HTTP_REQUEST_RETRY):
# Failure may occur before contact with the server is established:
status = everserver_status(everserver_status_path)
if status["status"] == ServerStatus.completed:
# For very small cases the optimization will finish and bring down the
# server before we can verify that it is running.
return

if status["status"] == ServerStatus.failed:
raise SystemExit(
"Failed to start Everest with error:\n{}".format(status["message"])
)

sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
if server_is_running(*ServerConfig.get_server_context(output_dir)):
return

# If number of retries reached and server is not running - throw exception
raise RuntimeError("Failed to start server within configured timeout.")
sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1)
for retry_count in range(_HTTP_REQUEST_RETRY):
if server_is_running(*ServerConfig.get_server_context(output_dir)):
return
else:
time.sleep(sleep_time_increment * (2**retry_count))
raise RuntimeError("Failed to get reply from server within configured timeout.")


def get_opt_status(output_folder):
Expand Down Expand Up @@ -175,6 +186,7 @@ def wait_for_server_to_stop(server_context: tuple[str, str, tuple[str, str]], ti

def server_is_running(url: str, cert: str, auth: tuple[str, str]):
try:
logging.info(f"Checking server status at {url} ")
response = requests.get(
url,
verify=cert,
Expand Down
Loading
Loading