This package provides a client to interact with OSCAR (https://oscar.grycap.net) clusters and services. It is available on Pypi with the name oscar-python.
options_basic_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'user':'username',
'password':'password',
'ssl':'True'}
client = Client(options = options_basic_auth)If you want to use OIDC tokens to authenticate with EGI Check-In, you can use the OIDC Agent to create an account configuration for the EGI issuer (https://aai.egi.eu/auth/realms/egi/) and then initialize the client specifying the shortname of your account like follows.
options_oidc_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'shortname':'oidc-agent-shortname',
'ssl':'True'}
client = Client(options = options_oidc_auth)If you already have a valid token, you can use the parameter oidc_token instead.
options_oidc_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'oidc_token':'token',
'ssl':'True'}
client = Client(options = options_oidc_auth)An example of using a generated token is if you want to use EGI Notebooks. Since you can't use oidc-agent on the Notebook, you can make use of the generated token that EGI provides on path /var/run/secrets/egi.eu/access_token.
If you have a valid refresh token (long live token), you can use the parameter refresh_token instead.
options_oidc_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'refresh_token':'token',
'ssl':'True'}
client = Client(options = options_oidc_auth)You can get a refresh token from EGI Check-In using the Token Portal.
In case of using other OIDC provider you must provide two additional parameters token_endpoint
and scopes:
options_oidc_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'refresh_token':'token',
'scopes': ["openid", "profile", "email"],
'token_endpoint': "http://issuer.com/token",
'client_id': "your_client_id"
'ssl':'True'}
client = Client(options = options_oidc_auth)- Sample code that creates a client and gets information about the cluster
from oscar_python.client import Client
options_basic_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'user':'username',
'password':'password',
'ssl':'True'}
client = Client(options = options)
# get the cluster information
try:
info = client.get_cluster_info()
print(info.text)
except Exception as err:
print("Failed with: ", err)- Sample code to create a simple service with the cowsay example and make a synchronous invocation.
from oscar_python.client import Client
options_basic_auth = {'cluster_id':'cluster-id',
'endpoint':'https://cluster-endpoint',
'user':'username',
'password':'password',
'ssl':'True'}
client = Client(options = options)
try:
client.create_service("/absolute_path/cowsay.yaml")
response = client.run_service("cowsay", input = '{"message": "Hi there"}')
if response.status_code == 200:
print(response.text)
except Exception as err:
print("Failed with: ", err)get_cluster_info
# get the cluster information
info = client.get_cluster_info() # returns an HTTP response or an HTTPErrorget_cluster_config
# get the cluster config
config = client.get_cluster_config() # returns an http response or an HTTPErrorget_service
# get the definition of a service
service = client.get_service("service_name") # returns an http response or an HTTPErrorlist_services
# get a list of all the services deployed
services = client.list_services() # returns an http response or an HTTPErrorNote : Both
path_to_fdland the script path inside the fdl must be absolute.
create_service
# create a service
err = client.create_service("path_to_fdl" | "JSON_definition") # returns nothing if the service is created or an error if something goes wrongupdate_service
# update a service
err = client.update_service("service_name","path_to_fdl" | "JSON_definition") # returns nothing if the service is created or an error if something goes wrongremove_service
# remove a service
response = client.remove_service("service_name") # returns an http responserun_service
input, output and timeout are optional parameters.
# make a synchronous execution
response = client.run_service("service_name", input="input", output="out.png", timeout=100) # returns an http response
# make an asynchronous execution
response = client.run_service("service_name", input="input", async_call=True) # returns an http responseget_job_logs
# get logs of a job
logs = client.get_job_logs("service_name", "job_id") # returns an http responselist_jobs
# get a list of jobs in a service
log_list = client.list_jobs("service_name") # returns an http response
# to get more jobs use the page parameter
log_list = client.list_jobs("service_name",page="token_to_next_page") # returns an http responseremove_job
# remove a job of a service
response = client.remove_job("service_name", "job_id") # returns an http responseremove_all_jobs
# remove all jobs in a service
response = client.remove_all_jobs("service_name") # returns an http responseYou can create a storage object to operate over the different storage providers defined on a service with the method create_storage_client. This constructor returns a storage object with methos to interact with the storage providers.
The default constructor, seen as follows, will create a provider to interact with the default MinIO instance through the user's credentials.
storage_service = client.create_storage_client() # returns a storage objectAdditionally, if you need to interact with specific storage providers defined on a service, the constructor accepts a svc parameter where you can state the service name from which to search for additional credentials.
storage_service = client.create_storage_client("service_name") # returns a storage objectNote : The
storage_providerparameter on the storage methods follows the format:["storage_provider_type"].["storage_provider_name"]wherestorage_provider_typeis one of the suported storage providers (minIO, S3, Onedata or webdav) andstorage_provider_nameis the identifier (ex: minio.default)
list_files_from_path
This method returns a JSON with the info except for OneData, which returns an HTTP response.
# get a list of the files of one of the service storage provider
files = storage_service.list_files_from_path("storage_provider", "remote_path") # returns jsonupload_file
# upload a file from a local path to a remote path
response = storage_service.upload_file("storage_provider", "local_path", "remote_path")download_file
# download a file from a remote path to a local path
response = storage_service.download_file("storage_provider", "local_path", "remote_path")