Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Jan 16, 2025
1 parent dc258ad commit d0b2ca4
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 75 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,3 @@ src/ert/shared/version.py

# config for pyright
pyrightconfig.json

justfile

6 changes: 0 additions & 6 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,3 @@ snake_oil:
# execute rapid unittests
rapid-tests:
nice pytest -n logical tests/ert/unit_tests -m "not integration_tests"


eve-math:
git clean -fd
everest run test-data/everest/math_func/config_minimal.yml --debug

4 changes: 1 addition & 3 deletions src/everest/bin/everest_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import threading
from functools import partial

from ert.run_models.everest_run_model import EverestRunModel
from everest.config import EverestConfig, ServerConfig
from everest.detached import (
ServerStatus,
Expand Down Expand Up @@ -119,7 +118,7 @@ async def run_everest(options):
options.config.simulation_dir is not None
and os.path.exists(options.config.simulation_dir)
and any(os.listdir(options.config.simulation_dir))
):
):
warn_user_that_runpath_is_nonempty()

try:
Expand All @@ -134,7 +133,6 @@ async def run_everest(options):
wait_for_server(options.config.output_dir, timeout=600)
print("Everest server found!")


start_experiment(
server_context=ServerConfig.get_server_context(options.config.output_dir),
config=options.config,
Expand Down
2 changes: 0 additions & 2 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def stop_server(server_context: tuple[str, str, tuple[str, str]], retries: int =
return False



def start_experiment(
server_context: tuple[str, str, tuple[str, str]],
config: EverestConfig,
Expand All @@ -108,7 +107,6 @@ def start_experiment(
raise ValueError("Failed to start experiment.") from e



def extract_errors_from_file(path: str):
with open(path, encoding="utf-8") as f:
content = f.read()
Expand Down
67 changes: 34 additions & 33 deletions src/everest/detached/jobs/everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import socket
import ssl
import threading
import time
import traceback
from base64 import b64encode
from functools import partial
from pathlib import Path
from typing import Any
import time

import requests
import uvicorn
from cryptography import x509
Expand All @@ -32,12 +33,16 @@
HTTPBasicCredentials,
)

from ert.config.parsing.queue_system import QueueSystem
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel
from everest import export_to_csv, export_with_progress
from everest.config import EverestConfig, ServerConfig
from everest.detached import ServerStatus, get_opt_status, start_experiment, update_everserver_status, wait_for_server
from everest.detached import (
PROXY,
ServerStatus,
get_opt_status,
update_everserver_status,
)
from everest.export import check_for_errors
from everest.plugins.everest_plugin_manager import EverestPluginManager
from everest.simulator import JOB_FAILURE
Expand All @@ -55,8 +60,6 @@
from everest.util import makedirs_if_needed, version_info


from everest.detached import PROXY

class ExperimentRunner(threading.Thread):
def __init__(self, everest_config, shared_data: dict):
super().__init__()
Expand All @@ -65,8 +68,6 @@ def __init__(self, everest_config, shared_data: dict):
self._shared_data = shared_data
self._exit_code: EverestExitCode | None = None



def run(self):
run_model = EverestRunModel.create(
self._everest_config,
Expand All @@ -75,7 +76,7 @@ def run(self):
)

if self._everest_config.simulator.queue_system.name == "local":
#if run_model._queue_config.queue_system == QueueSystem.LOCAL:
# if run_model._queue_config.queue_system == QueueSystem.LOCAL:
evaluator_server_config = EvaluatorServerConfig()
else:
evaluator_server_config = EvaluatorServerConfig(
Expand All @@ -91,20 +92,17 @@ def run(self):
except Exception:
self._exit_code = EverestExitCode.EXCEPTION
print("EXCEPTION")
#self.status = ExperimentRunnerStatus(
# self.status = ExperimentRunnerStatus(
# status="Experiment failed", message=traceback.format_exc()
#)
# )

@property
def exit_code(self) -> EverestExitCode | None:
return self._exit_code

@property
def shared_data(self) -> dict:
return self._shared_data





def _get_machine_name() -> str:
Expand Down Expand Up @@ -135,6 +133,8 @@ def _get_machine_name() -> str:


def _sim_monitor(context_status, shared_data=None):
assert shared_data is not None

status = context_status["status"]
shared_data[SIM_PROGRESS_ENDPOINT] = {
"batch_number": context_status["batch_number"],
Expand All @@ -153,6 +153,7 @@ def _sim_monitor(context_status, shared_data=None):


def _opt_monitor(shared_data=None):
assert shared_data is not None
if shared_data[STOP_ENDPOINT]:
return "stop_optimization"

Expand All @@ -161,7 +162,7 @@ def _everserver_thread(shared_data, server_config) -> None:
app = FastAPI()
security = HTTPBasic()

runner:ExperimentRunner | None = None
runner: ExperimentRunner | None = None

def _check_user(credentials: HTTPBasicCredentials) -> None:
if credentials.password != server_config["authentication"]:
Expand Down Expand Up @@ -191,6 +192,7 @@ def stop(
_log(request)
_check_user(credentials)
print(f"STOP ENDPOINT {shared_data}")

shared_data[STOP_ENDPOINT] = True
return Response("Raise STOP flag succeeded. Everest initiates shutdown..", 200)

Expand All @@ -211,7 +213,7 @@ def get_opt_progress(
_check_user(credentials)
progress = get_opt_status(server_config["optimization_output_dir"])
return JSONResponse(jsonable_encoder(progress))

@app.post("/" + START_EXPERIMENT_ENDPOINT)
def start_experiment(
config: EverestConfig,
Expand All @@ -233,6 +235,9 @@ def get_experiment_status(
) -> Response:
_log(request)
_check_user(credentials)

if shared_data[STOP_ENDPOINT]:
return Response(f"{EverestExitCode.USER_ABORT}", 200)
if runner is None:
return Response(None, 204)
status = runner.exit_code
Expand All @@ -250,8 +255,6 @@ def get_shared_data(
return JSONResponse(jsonable_encoder(shared_data))
return JSONResponse(jsonable_encoder(runner.shared_data))



uvicorn.run(
app,
host="0.0.0.0",
Expand Down Expand Up @@ -412,29 +415,28 @@ def main():

update_everserver_status(status_path, ServerStatus.running)


# add timeout
# add timeout
is_done = True
while (is_done):
exit_code = None
while is_done:
response = requests.get(
"/".join([url, EXPERIMENT_STATUS_ENDPOINT]),
verify=cert,
auth=auth,
proxies=PROXY, # type: ignore
)
exit_code = None
if response.status_code == requests.codes.OK:
exit_code = int( response.text if hasattr(response, "text") else response.body)
if response.status_code == requests.codes.OK:
exit_code = int(
response.text if hasattr(response, "text") else response.body
)
is_done = False
else:
time.sleep(1)


#status, message = _get_optimization_status(exit_code, shared_data)
#if status != ServerStatus.completed:
# status, message = _get_optimization_status(exit_code, shared_data)
# if status != ServerStatus.completed:
# update_everserver_status(status_path, status, message)
# time.sleep(1)

# time.sleep(1)

response: requests.Response = requests.get(

Check failure on line 441 in src/everest/detached/jobs/everserver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Name "response" already defined on line 422
url + "/" + SHARED_DATA_ENDPOINT,
Expand All @@ -450,12 +452,11 @@ def main():
print("Shared data: ", shared_data)
print("Exit code: ", exit_code)


status, message = _get_optimization_status(exit_code, shared_data)
if status != ServerStatus.completed:
update_everserver_status(status_path, status, message)
return
except Exception as e:
except Exception:
if shared_data[STOP_ENDPOINT]:
update_everserver_status(
status_path,
Expand Down Expand Up @@ -490,7 +491,7 @@ def main():
data_frame=export_with_progress(config, export_ecl),
export_path=config.export_path,
)
except Exception as e:
except Exception:
update_everserver_status(
status_path,
ServerStatus.failed,
Expand All @@ -514,7 +515,7 @@ def _get_optimization_status(exit_code, shared_data):

case EverestExitCode.USER_ABORT:
return ServerStatus.stopped, "Optimization aborted."

case EverestExitCode.EXCEPTION:
return ServerStatus.failed, "Optimization failed with exception."

Expand Down
4 changes: 2 additions & 2 deletions tests/everest/test_detached.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from everest.util import makedirs_if_needed


#@pytest.mark.flaky(reruns=5)
# @pytest.mark.flaky(reruns=5)
@pytest.mark.integration_test
@pytest.mark.fails_on_macos_github_workflow
@pytest.mark.xdist_group(name="starts_everest")
Expand All @@ -66,7 +66,7 @@ async def test_https_requests(copy_math_func_test_data_to_tmp):
raise e

server_status = everserver_status(status_path)
assert ServerStatus.running == server_status["status"]
assert server_status["status"] in {ServerStatus.running, ServerStatus.starting}

url, cert, auth = ServerConfig.get_server_context(everest_config.output_dir)
result = requests.get(url, verify=cert, auth=auth, proxies=PROXY) # noqa: ASYNC210
Expand Down
Loading

0 comments on commit d0b2ca4

Please sign in to comment.