From 552fd917959c37e6cb9a51994351250f1c6a2060 Mon Sep 17 00:00:00 2001 From: DustinKLo Date: Tue, 4 Jun 2019 16:04:44 -0700 Subject: [PATCH 1/4] de-duplication logic that compares scihub acq with existing acq by missiondatatakeid added function to convert timestamp string to datetime obj to compare scihub acq to existing acq copied deprecate_document function to that we can deprecate the older acq fixed some python formatting added .gitignore file added some logging to my de-duplication logic --- .gitignore | 155 ++++++++++++++++++++++++++++++++++++ deprecate_acquisition.py | 47 +++++++++++ scrape_apihub_opensearch.py | 97 +++++++++++++++++++--- 3 files changed, 289 insertions(+), 10 deletions(-) create mode 100644 .gitignore create mode 100644 deprecate_acquisition.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5c6a264 --- /dev/null +++ b/.gitignore @@ -0,0 +1,155 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + + + +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk +.idea diff --git a/deprecate_acquisition.py b/deprecate_acquisition.py new file mode 100644 index 0000000..20b16e3 --- /dev/null +++ b/deprecate_acquisition.py @@ -0,0 +1,47 @@ +import elasticsearch +from hysds.celery import app + +es_url = app.conf["GRQ_ES_URL"] +# _index = None +# _type = None + +ES = elasticsearch.Elasticsearch(es_url) + + +def deprecate_document(index, _id): + """ + Update the ES document with new information + :param index: name of elasticsearch index + :param _id: id of product delivered to ASF + :param delivery_time: delivery time to ASF to stamp to delivered product (can we delete this note) + :param ingest_time: ingestion time to ASF to stamp to delivered product (can we delete this note) + :param delivery_status: status of delivery to stamp to delivered product (can we delete this note) + :param product_tagging: + :return: + """ + ''' + + Note: borrowed from user_tags + @param product_id - + @param delivery_time - + ''' + + new_doc = dict() + doc = dict() + metadata = dict() + metadata["tags"] = "deprecated" + doc["metadata"] = metadata + new_doc["doc"] = doc + + ES.update(index=index, doc_type="acquisition-S1-IW_SLC", id=_id, body=new_doc) + return True + + +if __name__ == "__main__": + ''' + Main program that find IPF version for acquisition + ''' + # txt = open("deprecate_acq.txt", "r") + # for acq in txt: + # acq_id = acq.strip() + # update_document(_id=acq_id) diff --git a/scrape_apihub_opensearch.py b/scrape_apihub_opensearch.py index 23b13af..5c548c5 100755 --- a/scrape_apihub_opensearch.py +++ b/scrape_apihub_opensearch.py @@ -22,7 +22,7 @@ from osaka.main import get # from notify_by_email import send_email - +from deprecate_acquisition import deprecate_document # disable warnings for SSL verification requests.packages.urllib3.disable_warnings(InsecureRequestWarning) @@ -38,11 +38,13 @@ log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" logging.basicConfig(format=log_format, level=logging.INFO) + class LogFilter(logging.Filter): def filter(self, record): if not hasattr(record, 'id'): record.id = '--' return True + logger = logging.getLogger('scrape_apihub_opensearch') logger.setLevel(logging.INFO) logger.addFilter(LogFilter()) @@ -71,6 +73,23 @@ def get_timestamp_for_filename(time): return time +def extract_datetime_from_metadata(ts): + ''' + converts timestamp string to python datetime object + :param ts: string + :return: datetime object + ''' + time_formats = ['%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%f%z', + '%Y-%m-%d %H:%M:%S.%fZ', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S.%f%z'] + for time_format in time_formats: + try: + extracted_date = datetime.strptime(ts, time_format) + return extracted_date + except RuntimeError as e: + pass + raise RuntimeError("Could not extract datetime object from timestamp: %s" % ts) + + def get_accurate_times(filename_str, starttime_str, endtime_str): ''' Use the seconds from the start/end strings to append to the input filename timestamp to keep accuracy @@ -122,6 +141,7 @@ def list_status(starttime, endtime, prods_count, prods_missing, ids_by_track, ds prods_count) logger.info(msg) + def massage_result(res): """Massage result JSON into HySDS met json.""" @@ -210,7 +230,8 @@ def create_acq_dataset(ds, met, root_ds_dir=".", browse=False): id = "acquisition-{}_{}_{}_{}-esa_scihub".format(met["platform"],get_timestamp_for_filename(met["sensingStart"]), met["track_number"], met["sensoroperationalmode"]) root_ds_dir = os.path.abspath(root_ds_dir) ds_dir = os.path.join(root_ds_dir, id) - if not os.path.isdir(ds_dir): os.makedirs(ds_dir, 0755) + if not os.path.isdir(ds_dir): + os.makedirs(ds_dir, 0755) # append source to met met['query_api'] = "opensearch" @@ -259,6 +280,14 @@ def rhead(url): def get_existing_acqs(start_time, end_time, location=False): + ''' + :param start_time: str, timestamp + :param end_time: str, timestamp + :param location: coordinates array + :return: + acq_ids: list[str], list of esa UUID's + mission_data_id_store: { : { id: str, ingestiondate: str } } + ''' """ This function would query for all the acquisitions that temporally and spatially overlap with the AOI @@ -312,6 +341,8 @@ def get_existing_acqs(start_time, end_time, location=False): query["query"]["filtered"]["filter"] = geo_shape acq_ids = [] + mission_data_id_store = dict() # dictionary of missiondatatakeid containing UUID, ingestion date and _id + rest_url = app.conf["GRQ_ES_URL"][:-1] if app.conf["GRQ_ES_URL"].endswith('/') else app.conf["GRQ_ES_URL"] url = "{}/{}/_search?search_type=scan&scroll=60&size=10000".format(rest_url, index) r = requests.post(url, data=json.dumps(query)) @@ -335,8 +366,18 @@ def get_existing_acqs(start_time, end_time, location=False): for item in hits: acq_ids.append(item.get("_source").get("metadata").get("id")) + missiondatatakeid = item['_source']['metadata'].get('missiondatatakeid') + + if missiondatatakeid: + acq_id = item['_source']['metadata'].get('id') + ingestion_date = item['_source']['metadata'].get('ingestiondate') + mission_data_id_store[missiondatatakeid] = { + 'id': acq_id, + '_id': item['_id'], + 'ingestionDate': ingestion_date, + } - return acq_ids + return acq_ids, mission_data_id_store def scrape(ds_es_url, ds_cfg, starttime, endtime, polygon=False, user=None, password=None, @@ -345,7 +386,8 @@ def scrape(ds_es_url, ds_cfg, starttime, endtime, polygon=False, user=None, pass # get session session = requests.session() - if None not in (user, password): session.auth = (user, password) + if None not in (user, password): + session.auth = (user, password) # set query if purpose == "scrape": @@ -357,9 +399,10 @@ def scrape(ds_es_url, ds_cfg, starttime, endtime, polygon=False, user=None, pass if polygon: query += ' ( footprint:"Intersects({})")'.format(convert_to_wkt(polygon)) - existing_acqs = get_existing_acqs(start_time=starttime, end_time=endtime, location=json.loads(polygon)) + existing_acqs, acq_key_value_store = get_existing_acqs(start_time=starttime, end_time=endtime, + location=json.loads(polygon)) else: - existing_acqs = get_existing_acqs(start_time=starttime, end_time=endtime) + existing_acqs, acq_key_value_store = get_existing_acqs(start_time=starttime, end_time=endtime) # query prods_all = {} @@ -374,24 +417,33 @@ def scrape(ds_es_url, ds_cfg, starttime, endtime, polygon=False, user=None, pass logger.info("query: %s" % json.dumps(query_params, indent=2)) response = session.get(url, params=query_params, verify=False) logger.info("query_url: %s" % response.url) + if response.status_code != 200: logger.error("Error: %s\n%s" % (response.status_code,response.text)) response.raise_for_status() results = response.json() + if total_results_expected is None: total_results_expected = int(results['feed']['opensearch:totalResults']) entries = results['feed'].get('entry', None) - if entries is None: break + + if entries is None: + break + with open('res.json', 'w') as f: f.write(json.dumps(entries, indent=2)) - if isinstance(entries, dict): entries = [ entries ] # if one entry, scihub doesn't return a list + + if isinstance(entries, dict): + entries = [entries] # if one entry, scihub doesn't return a list count = len(entries) offset += count loop = True if count > 0 else False logger.info("Found: {0} results".format(count)) + for met in entries: - try: massage_result(met) - except Exception, e: + try: + massage_result(met) + except Exception as e: logger.error("Failed to massage result: %s" % json.dumps(met, indent=2, sort_keys=True)) logger.error("Extracted entries: %s" % json.dumps(entries, indent=2, sort_keys=True)) raise @@ -411,6 +463,31 @@ def scrape(ds_es_url, ds_cfg, starttime, endtime, polygon=False, user=None, pass ids_by_track.setdefault(met['track_number'], []).append(met['id']) + # FINDING ANY DUPLICATE ACQS WITH MISSIONDATATAKEID + scihub_missiontakeid = met.get('missiondatatakeid') # missiondatatakeid helps find duplicate acqs + existing_acq = acq_key_value_store.get(scihub_missiontakeid) # if we already have the acq + + if not scihub_missiontakeid or not existing_acq: + continue + + logger.info('missiondatatakeid found in our system: %s' % scihub_missiontakeid) + logger.info('existing acquisition: {}'.format(json.dumps(existing_acq, indent=2))) + + # existing ingestionDate from elasticsearch + es_ingestion_date = extract_datetime_from_metadata(existing_acq['ingestionDate']) + scihub_ingestion_date = extract_datetime_from_metadata(met['ingestiondate']) + + if scihub_ingestion_date <= es_ingestion_date: + # remove scihub's ID from products missing list if scihub's record is older than our own record + logger.info('scihub acq %s is older than existing acq, removing from prods_missing' % met['id']) + prods_missing.remove(met['id']) + else: + "mark older acq in elasticsearch as deprecated" + _id = existing_acq['_id'] + index = 'grq_v2.0_acquisition-s1-iw_slc' + deprecate_document(index, _id) + logger.info('deprecated acq: %s, index: %s' % (_id, index)) + # don't clobber the connection time.sleep(3) From 4fe0756f4fd1d61c85f81b40ea159b43e8581ff7 Mon Sep 17 00:00:00 2001 From: DustinKLo Date: Thu, 6 Jun 2019 10:04:12 -0700 Subject: [PATCH 2/4] added extra if constraint in checking if acq exists --- scrape_apihub_opensearch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scrape_apihub_opensearch.py b/scrape_apihub_opensearch.py index 5c548c5..d810133 100755 --- a/scrape_apihub_opensearch.py +++ b/scrape_apihub_opensearch.py @@ -479,8 +479,9 @@ def scrape(ds_es_url, ds_cfg, starttime, endtime, polygon=False, user=None, pass if scihub_ingestion_date <= es_ingestion_date: # remove scihub's ID from products missing list if scihub's record is older than our own record - logger.info('scihub acq %s is older than existing acq, removing from prods_missing' % met['id']) - prods_missing.remove(met['id']) + if met['id'] in prods_missing: + logger.info('scihub acq %s is older than existing acq, removing from prods_missing' % met['id']) + prods_missing.remove(met['id']) else: "mark older acq in elasticsearch as deprecated" _id = existing_acq['_id'] From e521020f0ed9ef1b566b8d0705bdc9732847a61f Mon Sep 17 00:00:00 2001 From: Malarout Date: Mon, 10 Jun 2019 14:22:30 -0700 Subject: [PATCH 3/4] malarout: Updated ASF query to account for duplicates --- ipf_version.py | 105 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 31 deletions(-) diff --git a/ipf_version.py b/ipf_version.py index f36b2d4..44515d1 100644 --- a/ipf_version.py +++ b/ipf_version.py @@ -27,6 +27,48 @@ def filter(self, record): _type = None ES = elasticsearch.Elasticsearch(es_url) +job_es_url = app.conf["JOBS_ES_URL"] +MOZART_ES = elasticsearch.Elasticsearch(job_es_url) +_job_index = "job_status-current" +_job_doc_type = "job" + +def find_duplicate_jobs(job_type, acq_id): + query = { + "query": { + "bool": { + "must": [ + { + "term": { + "type": job_type + } + } + ] + } + } +} + MOZART_ES.search(index=_job_index, doc_type=_job_doc_type, body=query) + return + +def evaluate_short_circuit(acq_id): + """ + Evaluates whether current job should short circuit and exit + Will do so in case there is other duplicate job in systerm with a lower retry count + :param acq_id: + :return: + """ + # find current job's retry count, if exists + job_json = json.loads(open("_job.json","r").read()) + retry_count = int(job_json.get("retry_count", 0)) + job_type = job_json.get("type") + + if retry_count == 0: + return + else: + # see if any duplicate jobs in system with lower retry count + results = find_duplicate_jobs(job_type, acq_id) + + + return def check_ipf_avail(id): result = ES.search(index="grq",body={"query": {"term": {"_id": id}}}) @@ -104,19 +146,23 @@ def get_dataset_json(met, version): } -def extract_asf_ipf(id): +def extract_asf_ipf(id, start_time, end_time): ipf = None try: # query the asf search api to find the download url for the .iso.xml file - request_string = 'https://api.daac.asf.alaska.edu/services/search/param?platform=SA,SB&processingLevel=METADATA_SLC' \ - '&granule_list=%s&output=json' % id + request_string = "https://api.daac.asf.alaska.edu/services/search/param?platform=SA,SB&processingLevel=METADATA_SLC" \ + "&start={}&end={}&output=json".format(start_time, end_time) logger.info("ASF request URL: {}".format(request_string)) response = requests.get(request_string) response.raise_for_status() results = json.loads(response.text) - logger.info("Response from ASF: {}".format(response.text)) + logger.info("Response for acquistion {} from ASF: {}".format(id, response.text)) # download the .iso.xml file, assumes earthdata login credentials are in your .netrc file - response = requests.get(results[0][0]['downloadUrl']) + hits = len(results[0]) + if hits == 1: + response = requests.get(results[0][0]['downloadUrl']) + # else, figure out how to handle multiple results + response.raise_for_status() if response.status_code != 200: raise Exception("Request to ASF failed with status {}. {}".format(response.status_code, request_string)) @@ -181,34 +227,31 @@ def extract_scihub_ipf(met): _type = ctx.get("dataset_type") endpoint = ctx["endpoint"] + evaluate_short_circuit(id) + if check_ipf_avail(id): - logger.error("Acquisition already has IPF, not processing with scraping") - with open('_alt_error.txt', 'w') as f: - f.write("Acquisition already has IPF, not processing with scraping for {}".format(id)) - f.close() - sys.exit(1) - - if endpoint == "asf": - try: - ipf = extract_asf_ipf(met.get("identifier")) - if ipf is None: - raise Exception - except Exception: - with open('_alt_error.txt', 'w') as f: - f.write("Failed to extract IPF version from ASF for {}".format(id)) - with open('_alt_traceback.txt', 'w') as f: - f.write("%s\n" % traceback.format_exc()) - raise Exception("Failed to extract IPF version from ASF for {}".format(id)) - else: - try: + logger.error("Acquisition already has IPF, not processing with scraping for {}".format(id)) + sys.exit(0) + + try: + if endpoint == "asf": + ipf = extract_asf_ipf(id=met.get("identifier"), start_time=met.get("sensingStart"), end_time=met. + get("sensingStop")) + else: ipf = extract_scihub_ipf(met) - if ipf is None: - raise Exception - except Exception: + if ipf is None: with open('_alt_error.txt', 'w') as f: - f.write("Failed to extract IPF version from SciHub for {}".format(id)) - with open('_alt_traceback.txt', 'w') as f: - f.write("%s\n" % traceback.format_exc()) - raise Exception("Failed to extract IPF version from SciHub for {}".format(id)) + f.write("Retrieved a null IPF version from {} for {}".format(endpoint, id)) + raise Exception + except Exception: + with open('_alt_error.txt', 'w') as f: + f.write("Failed to extract IPF version from {} for {}".format(endpoint, id)) + with open('_alt_traceback.txt', 'w') as f: + f.write("%s\n" % traceback.format_exc()) + raise Exception("Failed to extract IPF version from {} for {}".format(endpoint, id)) + + if check_ipf_avail(id): + logger.info("Acquisition already has IPF, not updating again") + sys.exit(0) update_ipf(id, ipf) From 2ec9cb97dd35d891d3aca5b5e0d4643b21699e52 Mon Sep 17 00:00:00 2001 From: Malarout Date: Tue, 18 Jun 2019 12:42:57 -0700 Subject: [PATCH 4/4] Update 1245: Added short circuit to IPF scrape jobs --- AOI_based_ipf_submitter.py | 7 ++- docker/hysds-io.json.ipf-scraper-asf | 5 ++ docker/hysds-io.json.ipf-scraper-scihub | 5 ++ docker/job-spec.json.ipf-scraper-asf | 4 ++ docker/job-spec.json.ipf-scraper-scihub | 4 ++ ipf_version.py | 64 +++++++++++++++++++------ 6 files changed, 74 insertions(+), 15 deletions(-) diff --git a/AOI_based_ipf_submitter.py b/AOI_based_ipf_submitter.py index a433853..a0a6055 100644 --- a/AOI_based_ipf_submitter.py +++ b/AOI_based_ipf_submitter.py @@ -121,6 +121,11 @@ def submit_ipf_scraper(acq, tag, endpoint): "from": "value", "value": endpoint }, + { + "name": "force", + "from": "value", + "value": False + }, { "name": "ds_cfg", "from": "value", @@ -164,7 +169,7 @@ def submit_ipf_scraper(acq, tag, endpoint): print("Date:" + acq.get("metadata").get("sensingStart")) acq_date = acq.get("metadata").get("sensingStart") start_time = dateutil.parser.parse(acq_date) - if start_time.replace(tzinfo=None) < datetime.now() - timedelta(days=1): + if start_time.replace(tzinfo=None) < datetime.now() - timedelta(days=2): endpoint = "asf" else: endpoint = "scihub" diff --git a/docker/hysds-io.json.ipf-scraper-asf b/docker/hysds-io.json.ipf-scraper-asf index 4eefda6..3ee44e5 100644 --- a/docker/hysds-io.json.ipf-scraper-asf +++ b/docker/hysds-io.json.ipf-scraper-asf @@ -23,6 +23,11 @@ "from": "value", "value": "asf" }, + { + "name": "force", + "from": "submitter", + "type": "boolean" + }, { "name": "ds_cfg", "from": "value", diff --git a/docker/hysds-io.json.ipf-scraper-scihub b/docker/hysds-io.json.ipf-scraper-scihub index 3d822b9..8b604f6 100644 --- a/docker/hysds-io.json.ipf-scraper-scihub +++ b/docker/hysds-io.json.ipf-scraper-scihub @@ -23,6 +23,11 @@ "from": "value", "value": "scihub" }, + { + "name": "force", + "from": "submitter", + "type": "boolean" + }, { "name": "ds_cfg", "from": "value", diff --git a/docker/job-spec.json.ipf-scraper-asf b/docker/job-spec.json.ipf-scraper-asf index 5b6a97d..1c47817 100644 --- a/docker/job-spec.json.ipf-scraper-asf +++ b/docker/job-spec.json.ipf-scraper-asf @@ -31,6 +31,10 @@ "destination": "context" }, + { + "name": "force", + "destination": "context" + }, { "name": "ds_cfg", "destination": "positional" diff --git a/docker/job-spec.json.ipf-scraper-scihub b/docker/job-spec.json.ipf-scraper-scihub index 859414a..1611f65 100644 --- a/docker/job-spec.json.ipf-scraper-scihub +++ b/docker/job-spec.json.ipf-scraper-scihub @@ -31,6 +31,10 @@ "destination": "context" }, + { + "name": "force", + "destination": "context" + }, { "name": "ds_cfg", "destination": "positional" diff --git a/ipf_version.py b/ipf_version.py index 44515d1..10aaabf 100644 --- a/ipf_version.py +++ b/ipf_version.py @@ -32,6 +32,7 @@ def filter(self, record): _job_index = "job_status-current" _job_doc_type = "job" + def find_duplicate_jobs(job_type, acq_id): query = { "query": { @@ -41,13 +42,23 @@ def find_duplicate_jobs(job_type, acq_id): "term": { "type": job_type } + }, + { + "query_string": { + "query": "\"{}\"".format(acq_id), + "default_operator": "OR" + } } ] } - } + }, + "fields": [ + "job.retry_count", + "status" + ] } - MOZART_ES.search(index=_job_index, doc_type=_job_doc_type, body=query) - return + return MOZART_ES.search(index=_job_index, doc_type=_job_doc_type, body=query) + def evaluate_short_circuit(acq_id): """ @@ -62,16 +73,31 @@ def evaluate_short_circuit(acq_id): job_type = job_json.get("type") if retry_count == 0: - return + return False else: # see if any duplicate jobs in system with lower retry count results = find_duplicate_jobs(job_type, acq_id) + count = results.get("hits").get("total") + if count == 0: + return False + else: + for hit in results.get("hits").get("hits"): + job_id = hit.get("_id") + status = hit.get("fields").get("status")[0] + if status == "job-completed": + logger.info("Found completed job in the system but the IPF is still null. Please Investigate.") + raise Exception("Found completed job in the system but the IPF is still null. Please Investigate.") + else: + other_retry_count = hit.get("fields").get("job.retry_count", [0])[0] + if other_retry_count < retry_count: + logger.info("Found another job: {}, with lower retry_count of {} and status {}" + .format(job_id, other_retry_count, status)) + return True + return False - return - def check_ipf_avail(id): - result = ES.search(index="grq",body={"query": {"term": {"_id": id}}}) + result = ES.search(index="grq", body={"query": {"term": {"_id": id}}}) ipf_version = result.get("hits").get("hits")[0].get("_source").get("metadata").get("processing_version", None) if ipf_version is not None: @@ -226,12 +252,23 @@ def extract_scihub_ipf(met): _index = ctx.get("index") _type = ctx.get("dataset_type") endpoint = ctx["endpoint"] - - evaluate_short_circuit(id) - - if check_ipf_avail(id): - logger.error("Acquisition already has IPF, not processing with scraping for {}".format(id)) - sys.exit(0) + force = ctx["force"] + + if not force: + try: + if check_ipf_avail(id): + logger.info("Acquisition already has IPF, not processing with scraping for {}".format(id)) + sys.exit(0) + + if evaluate_short_circuit(id): + logger.info("IPF scrape job with lower retry count exists for this acquisition, " + "not submitting scrape job") + sys.exit(0) + except Exception as ex: + with open('_alt_error.txt', 'w') as f: + f.write("{}".format(ex)) + with open('_alt_traceback.txt', 'w') as f: + f.write("{}\n".format(traceback.format_exc())) try: if endpoint == "asf": @@ -248,7 +285,6 @@ def extract_scihub_ipf(met): f.write("Failed to extract IPF version from {} for {}".format(endpoint, id)) with open('_alt_traceback.txt', 'w') as f: f.write("%s\n" % traceback.format_exc()) - raise Exception("Failed to extract IPF version from {} for {}".format(endpoint, id)) if check_ipf_avail(id): logger.info("Acquisition already has IPF, not updating again")