Skip to content

Commit

Permalink
Merge pull request #47 from romeokienzler/romeokienzler-patch-1
Browse files Browse the repository at this point in the history
implement #45 (single string cos connection), support pluggable grid wrapper backends, implement s3kv backend
  • Loading branch information
romeokienzler authored Feb 23, 2024
2 parents 171a5cf + 5948558 commit 996c8b8
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 29 deletions.
31 changes: 22 additions & 9 deletions src/c3/create_gridwrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from c3.pythonscript import Pythonscript
from c3.utils import convert_notebook
from c3.create_operator import create_operator
from c3.templates import grid_wrapper_template, cos_grid_wrapper_template, component_setup_code_wo_logging
from c3.templates import component_setup_code_wo_logging
import c3


def wrap_component(component_path,
Expand All @@ -16,12 +17,24 @@ def wrap_component(component_path,
component_interface,
component_inputs,
component_process,
cos,
backend,
):
# get component name from path
component_name = os.path.splitext(os.path.basename(component_path))[0]

gw_template = cos_grid_wrapper_template if cos else grid_wrapper_template
logging.info(f'Using backend: {backend}')


backends = {
'cos_grid_wrapper' : c3.templates.cos_grid_wrapper_template,
'grid_wrapper' : c3.templates.grid_wrapper_template,
's3kv_grid_wrapper': c3.templates.s3kv_grid_wrapper_template,
}
gw_template = backends.get(backend)

logging.debug(f'Using backend template: {gw_template}')


grid_wrapper_code = gw_template.substitute(
component_name=component_name,
component_description=component_description,
Expand All @@ -32,7 +45,7 @@ def wrap_component(component_path,
)

# Write edited code to file
grid_wrapper_file = f'cgw_{component_name}.py' if cos else f'gw_{component_name}.py'
grid_wrapper_file = 'grid_wrapper.py'
grid_wrapper_file_path = os.path.join(os.path.dirname(component_path), grid_wrapper_file)
# remove 'component_' from gw path
grid_wrapper_file_path = grid_wrapper_file_path.replace('component_', '')
Expand Down Expand Up @@ -112,7 +125,7 @@ def edit_component_code(file_path, component_process):
return target_file


def apply_grid_wrapper(file_path, component_process, cos):
def apply_grid_wrapper(file_path, component_process, backend):
assert file_path.endswith('.py') or file_path.endswith('.ipynb'), \
"Please provide a component file path to a python script or notebook."

Expand All @@ -134,7 +147,7 @@ def apply_grid_wrapper(file_path, component_process, cos):
logging.debug(component + ':\n' + str(value) + '\n')

logging.info('Wrap component')
grid_wrapper_file_path = wrap_component(cos=cos, **component_elements)
grid_wrapper_file_path = wrap_component(backend=backend, **component_elements)
return grid_wrapper_file_path, file_path


Expand All @@ -146,8 +159,8 @@ def main():
help='List of paths to additional files to include in the container image')
parser.add_argument('-p', '--component_process', type=str, default='grid_process',
help='Name of the component sub process that is executed for each batch.')
parser.add_argument('--cos', action=argparse.BooleanOptionalAction, default=False,
help='Creates a grid wrapper for processing COS files')
parser.add_argument('-b', '--backend', type=str, default='s3kv_grid_wrapper',
help='Define backend. Default: s3kv_grid_wrapper. Others: grid_wrapper, cos_grid_wrapper')
parser.add_argument('-r', '--repository', type=str, default=None,
help='Container registry address, e.g. docker.io/<username>')
parser.add_argument('-v', '--version', type=str, default=None,
Expand Down Expand Up @@ -182,7 +195,7 @@ def main():
grid_wrapper_file_path, component_path = apply_grid_wrapper(
file_path=args.FILE_PATH,
component_process=args.component_process,
cos=args.cos,
backend=args.backend,
)

logging.info('Generate CLAIMED operator for grid wrapper')
Expand Down
4 changes: 4 additions & 0 deletions src/c3/templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
CWL_COMPONENT_FILE = 'cwl_component_template.cwl'
GRID_WRAPPER_FILE = 'grid_wrapper_template.py'
COS_GRID_WRAPPER_FILE = 'cos_grid_wrapper_template.py'
S3KV_GRID_WRAPPER_FILE = 's3kv_grid_wrapper_template.py'

# load templates
template_path = Path(os.path.dirname(__file__))
Expand Down Expand Up @@ -47,3 +48,6 @@

with open(template_path / COS_GRID_WRAPPER_FILE, 'r') as f:
cos_grid_wrapper_template = Template(f.read())

with open(template_path / S3KV_GRID_WRAPPER_FILE, 'r') as f:
s3kv_grid_wrapper_template = Template(f.read())
1 change: 0 additions & 1 deletion src/c3/templates/cos_grid_wrapper_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
(gw_target_access_key_id, gw_target_secret_access_key, gw_target_endpoint, gw_target_path) = explode_connection_string(gw_target_connection)

# cos gw_coordinator_connection

gw_coordinator_connection = os.environ.get('gw_coordinator_connection')
(gw_coordinator_access_key_id, gw_coordinator_secret_access_key, gw_coordinator_endpoint, gw_coordinator_path) = explode_connection_string(gw_target_connection)

Expand Down
89 changes: 70 additions & 19 deletions src/c3/templates/grid_wrapper_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,33 @@
import glob
from pathlib import Path
import pandas as pd
import s3fs

# import component code
from ${component_name} import *


# File with batches. Provided as a comma-separated list of strings or keys in a json dict.
explode_connection_string(cs):
if cs is None:
return None
if cs.startswith('cos') or cs.startswith('s3'):
buffer=cs.split('://')[1]
access_key_id=buffer.split('@')[0].split(':')[0]
secret_access_key=buffer.split('@')[0].split(':')[1]
endpoint=buffer.split('@')[1].split('/')[0]
path='/'.join(buffer.split('@')[1].split('/')[1:])
return (access_key_id, secret_access_key, endpoint, path)
else:
return (None, None, None, cs)
# TODO consider cs as secret and grab connection string from kubernetes



# File with batches. Provided as a comma-separated list of strings, keys in a json dict or single column CSV with 'filename' has header. Either local path as [cos|s3]://user:pw@endpoint/path
gw_batch_file = os.environ.get('gw_batch_file', None)
(gw_batch_file_access_key_id, gw_batch_secret_access_key, gw_batch_endpoint, gw_batch_file) = explode_connection_string(gw_batch_file):


# file path pattern like your/path/**/*.tif. Multiple patterns can be separated with commas. Is ignored if gw_batch_file is provided.
gw_file_path_pattern = os.environ.get('gw_file_path_pattern', None)
# pattern for grouping file paths into batches like ".split('.')[-1]". Is ignored if gw_batch_file is provided.
Expand All @@ -43,25 +63,56 @@
${component_interface}

def load_batches_from_file(batch_file):
if batch_file.endswith('.json'):
# load batches from keys of a json file
logging.info(f'Loading batches from json file: {batch_file}')
with open(batch_file, 'r') as f:
batch_dict = json.load(f)
batches = batch_dict.keys()

elif batch_file.endswith('.csv'):
# load batches from keys of a csv file
logging.info(f'Loading batches from csv file: {batch_file}')
df = pd.read_csv(batch_file, header='infer')
batches = df['filename'].to_list()
if gw_batch_file_access_key_id is not None:
s3source = s3fs.S3FileSystem(
anon=False,
key=gw_batch_file_access_key_id,
secret=gw_batch_secret_access_key,
client_kwargs={'endpoint_url': gw_batch_endpoint})


if batch_file.endswith('.json'):
# load batches from keys of a json file
logging.info(f'Loading batches from json file: {batch_file}')
with s3source.open(gw_batch_file, 'r') as f:
batch_dict = json.load(f)
batches = batch_dict.keys()

elif batch_file.endswith('.csv'):
# load batches from keys of a csv file
logging.info(f'Loading batches from csv file: {batch_file}')
s3source.get(batch_file, batch_file)
df = pd.read_csv(batch_file, header='infer')
batches = df['filename'].to_list()


else:
# Load batches from comma-separated txt file
logging.info(f'Loading comma-separated batch strings from file: {batch_file}')
with s3source.open(gw_batch_file, 'r') as f:
batch_string = f.read()
batches = [b.strip() for b in batch_string.split(',')]

else:
# Load batches from comma-separated txt file
logging.info(f'Loading comma-separated batch strings from file: {batch_file}')
with open(batch_file, 'r') as f:
batch_string = f.read()
batches = [b.strip() for b in batch_string.split(',')]
if batch_file.endswith('.json'):
# load batches from keys of a json file
logging.info(f'Loading batches from json file: {batch_file}')
with open(batch_file, 'r') as f:
batch_dict = json.load(f)
batches = batch_dict.keys()

elif batch_file.endswith('.csv'):
# load batches from keys of a csv file
logging.info(f'Loading batches from csv file: {batch_file}')
df = pd.read_csv(batch_file, header='infer')
batches = df['filename'].to_list()

else:
# Load batches from comma-separated txt file
logging.info(f'Loading comma-separated batch strings from file: {batch_file}')
with open(batch_file, 'r') as f:
batch_string = f.read()
batches = [b.strip() for b in batch_string.split(',')]

logging.info(f'Loaded {len(batches)} batches')
logging.debug(f'List of batches: {batches}')
Expand Down Expand Up @@ -198,4 +249,4 @@ def process_wrapper(sub_process):


if __name__ == '__main__':
process_wrapper(${component_process})
process_wrapper(${component_process})
Loading

0 comments on commit 996c8b8

Please sign in to comment.