Skip to content

Commit 4667bc2

Browse files
committed
Support for GlobusComputeExecutor to submit functions to globus_compute_endpoints
* A new `GlobusComputeExecutor` implementation * Docs, tests, and examples * Github Action for GlobusComputeExecutor (#3619)
1 parent 83a2030 commit 4667bc2

File tree

13 files changed

+386
-2
lines changed

13 files changed

+386
-2
lines changed

.github/workflows/gce_test.yaml

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
name: GlobusComputeExecutor tests
2+
3+
on:
4+
pull_request:
5+
types:
6+
- opened
7+
- synchronize
8+
9+
env:
10+
PYTHON_VERSION: 3.11
11+
12+
jobs:
13+
main-test-suite:
14+
runs-on: ubuntu-20.04
15+
timeout-minutes: 60
16+
17+
steps:
18+
- uses: actions/checkout@master
19+
20+
- name: Set up Python Environment
21+
uses: actions/setup-python@v4
22+
with:
23+
python-version: ${{ env.PYTHON_VERSION }}
24+
25+
- name: Collect Job Information
26+
id: job-info
27+
run: |
28+
echo "Python Version: ${{ env.PYTHON_VERSION }} " >> ci_job_info.txt
29+
echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
30+
echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
31+
echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
32+
echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
33+
echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
34+
as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
35+
echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT
36+
37+
- name: setup virtual env
38+
run: |
39+
make virtualenv
40+
source .venv/bin/activate
41+
42+
- name: Non-requirements based install
43+
run: |
44+
# mpich: required by mpi4py which is in test-requirements for radical-pilot
45+
sudo apt-get update -q
46+
sudo apt-get install -qy mpich
47+
48+
- name: make deps clean_coverage
49+
run: |
50+
source .venv/bin/activate
51+
make deps
52+
make clean_coverage
53+
54+
# Temporary fix until fixes make it to a release
55+
git clone -b main https://github.com/globus/globus-compute.git
56+
pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint
57+
58+
- name: start globus_compute_endpoint
59+
env:
60+
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
61+
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
62+
run: |
63+
source /home/runner/work/parsl/parsl/.venv/bin/activate
64+
globus-compute-endpoint configure default
65+
which globus-compute-endpoint
66+
python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)"
67+
python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)"
68+
cat << EOF > /home/runner/.globus_compute/default/config.yaml
69+
engine:
70+
type: ThreadPoolEngine
71+
max_workers: 4
72+
EOF
73+
cat /home/runner/.globus_compute/default/config.yaml
74+
mkdir ~/.globus_compute/default/tasks_working_dir
75+
globus-compute-endpoint start default
76+
globus-compute-endpoint list
77+
- name: make test
78+
env:
79+
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
80+
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
81+
run: |
82+
source .venv/bin/activate
83+
export GLOBUS_COMPUTE_ENDPOINT=$(jq -r .endpoint_id < ~/.globus_compute/default/endpoint.json)
84+
echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"
85+
86+
export PARSL_TEST_PRESERVE_NUM_RUNS=7
87+
88+
make gce_test
89+
ln -s pytest-parsl/parsltest-current test_runinfo
90+
91+
- name: stop globus_compute_endpoint
92+
env:
93+
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
94+
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
95+
run: |
96+
source /home/runner/work/parsl/parsl/.venv/bin/activate
97+
globus-compute-endpoint stop default
98+
99+
- name: Archive runinfo logs
100+
if: ${{ always() }}
101+
uses: actions/upload-artifact@v4
102+
with:
103+
name: runinfo-${{ env.PYTHON_VERSION }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
104+
path: |
105+
runinfo/
106+
pytest-parsl/
107+
ci_job_info.txt
108+
compression-level: 9

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ clean_coverage:
5151
mypy: ## run mypy checks
5252
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/
5353

54+
.PHONY: gce_test
55+
gce_test: ## Run tests with GlobusComputeExecutor
56+
pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10
57+
5458
.PHONY: local_thread_test
5559
local_thread_test: ## run all tests with local_thread config
5660
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10

docs/reference.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Executors
7979
parsl.executors.taskvine.TaskVineExecutor
8080
parsl.executors.FluxExecutor
8181
parsl.executors.radical.RadicalPilotExecutor
82+
parsl.executors.GlobusComputeExecutor
8283

8384
Manager Selectors
8485
=================

docs/userguide/configuration/execution.rst

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,19 @@ Parsl currently supports the following executors:
8686
4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine <https://ccl.cse.nd.edu/software/taskvine/>`_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing.
8787
These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors.
8888

89+
5. `parsl.executors.GlobusComputeExecutor`: This executor uses `Globus Compute <https://www.globus.org/compute>`_
90+
as the execution backend. Globus Compute is a distributed Function as a Service (FaaS) platform that enables secure
91+
execution of functions on heterogeneous remote computers, from laptops to campus clusters, clouds, and supercomputers.
92+
Functions are executed on `Globus Compute Endpoints <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html>`_
93+
that can be `configured <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoint_examples.html>`_
94+
to scale execution on most batch schedulers automatically. Since Globus Compute Endpoints use `parsl.executors.HighThroughputExecutor`
95+
as the default execution system, this executor can be thought of as an extension of the `parsl.executors.HighThroughputExecutor` with
96+
a secure and reliable remote execution wrapper.
97+
8998
.. note::
9099
Refer to :ref:`configuration-section` for information on how to configure these executors.
91100

92101

93-
Launchers
94102
---------
95103

96104
Many LRMs offer mechanisms for spawning applications across nodes

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ ignore_missing_imports = True
177177
#[mypy-multiprocessing.synchronization.*]
178178
#ignore_missing_imports = True
179179

180+
[mypy-globus_compute_sdk.*]
181+
ignore_missing_imports = True
182+
180183
[mypy-pandas.*]
181184
ignore_missing_imports = True
182185

parsl/executors/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from parsl.executors.flux.executor import FluxExecutor
2+
from parsl.executors.globus_compute import GlobusComputeExecutor
23
from parsl.executors.high_throughput.executor import HighThroughputExecutor
34
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
45
from parsl.executors.threads import ThreadPoolExecutor
@@ -8,4 +9,5 @@
89
'HighThroughputExecutor',
910
'MPIExecutor',
1011
'WorkQueueExecutor',
11-
'FluxExecutor']
12+
'FluxExecutor',
13+
'GlobusComputeExecutor']

parsl/executors/globus_compute.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from __future__ import annotations
2+
3+
import copy
4+
from concurrent.futures import Future
5+
from typing import Any, Callable, Dict
6+
7+
import typeguard
8+
9+
from parsl.errors import OptionalModuleMissing
10+
from parsl.executors.base import ParslExecutor
11+
from parsl.utils import RepresentationMixin
12+
13+
try:
14+
from globus_compute_sdk import Executor
15+
_globus_compute_enabled = True
16+
except ImportError:
17+
_globus_compute_enabled = False
18+
19+
20+
class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
21+
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints
22+
23+
GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
24+
Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
25+
and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
26+
for more details.
27+
28+
.. note::
29+
As a remote execution system, Globus Compute relies on serialization to ship
30+
tasks and results between the Parsl client side and the remote Globus Compute
31+
Endpoint side. Serialization is unreliable across python versions, and
32+
wrappers used by Parsl assume identical Parsl versions across on both sides.
33+
We recommend using matching Python, Parsl and Globus Compute version on both
34+
the client side and the endpoint side for stable behavior.
35+
36+
"""
37+
38+
@typeguard.typechecked
39+
def __init__(
40+
self,
41+
executor: Executor,
42+
label: str = 'GlobusComputeExecutor',
43+
):
44+
"""
45+
Parameters
46+
----------
47+
48+
executor: globus_compute_sdk.Executor
49+
Pass a globus_compute_sdk Executor that will be used to execute
50+
tasks on a globus_compute endpoint. Refer to `globus-compute docs
51+
<https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_
52+
53+
label:
54+
a label to name the executor
55+
"""
56+
if not _globus_compute_enabled:
57+
raise OptionalModuleMissing(
58+
['globus-compute-sdk'],
59+
"GlobusComputeExecutor requires globus-compute-sdk installed"
60+
)
61+
62+
super().__init__()
63+
self.executor: Executor = executor
64+
self.resource_specification = self.executor.resource_specification
65+
self.user_endpoint_config = self.executor.user_endpoint_config
66+
self.label = label
67+
68+
def start(self) -> None:
69+
""" Start the Globus Compute Executor """
70+
pass
71+
72+
def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
73+
""" Submit func to globus-compute
74+
75+
76+
Parameters
77+
----------
78+
79+
func: Callable
80+
Python function to execute remotely
81+
82+
resource_specification: Dict[str, Any]
83+
Resource specification can be used specify MPI resources required by MPI applications on
84+
Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config*
85+
to configure endpoints when the endpoint is a `Multi-User Endpoint
86+
<https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_
87+
88+
args:
89+
Args to pass to the function
90+
91+
kwargs:
92+
kwargs to pass to the function
93+
94+
Returns
95+
-------
96+
97+
Future
98+
"""
99+
res_spec = copy.deepcopy(resource_specification or self.resource_specification)
100+
# Pop user_endpoint_config since it is illegal in resource_spec for globus_compute
101+
if res_spec:
102+
user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)
103+
else:
104+
user_endpoint_config = self.user_endpoint_config
105+
106+
try:
107+
self.executor.resource_specification = res_spec
108+
self.executor.user_endpoint_config = user_endpoint_config
109+
return self.executor.submit(func, *args, **kwargs)
110+
finally:
111+
# Reset executor state to defaults set at configuration time
112+
self.executor.resource_specification = self.resource_specification
113+
self.executor.user_endpoint_config = self.user_endpoint_config
114+
115+
def shutdown(self):
116+
"""Clean-up the resources associated with the Executor.
117+
118+
GCE.shutdown will cancel all futures that have not yet registered with
119+
Globus Compute and will not wait for the launched futures to complete.
120+
This method explicitly shutsdown the result_watcher thread to avoid
121+
it waiting for outstanding futures at thread exit.
122+
"""
123+
self.executor.shutdown(wait=False, cancel_futures=True)
124+
result_watcher = self.executor._get_result_watcher()
125+
result_watcher.shutdown(wait=False, cancel_futures=True)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import os
2+
3+
from globus_compute_sdk import Executor
4+
5+
from parsl.config import Config
6+
from parsl.executors import GlobusComputeExecutor
7+
8+
9+
def fresh_config():
10+
11+
endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"]
12+
13+
return Config(
14+
executors=[
15+
GlobusComputeExecutor(
16+
executor=Executor(endpoint_id=endpoint_id),
17+
label="globus_compute",
18+
)
19+
]
20+
)

parsl/tests/conftest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ def pytest_configure(config):
163163
'markers',
164164
'shared_fs: Marks tests that require a shared_fs between the workers are the test client'
165165
)
166+
config.addinivalue_line(
167+
'markers',
168+
'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)'
169+
)
166170

167171

168172
@pytest.fixture(autouse=True, scope='session')

parsl/tests/test_error_handling/test_resource_spec.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import pytest
2+
13
import parsl
24
from parsl.app.app import python_app
35
from parsl.executors import WorkQueueExecutor
@@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}):
1113
return x * 2
1214

1315

16+
@pytest.mark.issue_3620
1417
def test_resource(n=2):
1518
executors = parsl.dfk().executors
1619
executor = None

0 commit comments

Comments
 (0)