-
Notifications
You must be signed in to change notification settings - Fork 4
Gsaksena mmd2 #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Gsaksena mmd2 #67
Changes from all commits
822320e
44ffe88
b4e9f8e
cbaafd7
518409f
9ce8811
e2fb22b
7da50d1
3ec8fa1
be4f31c
855d011
cf9b5f8
f9e3e9d
2bf501b
7b8e95c
2557ad5
2155b01
03fe15d
613c77c
4f4f39e
57b3c7d
47c5ff1
2949a39
ecf5ecf
b61aa36
6566b16
97886da
1c08992
4653128
003518b
40a5524
96f9b32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,23 +57,46 @@ def execute(self): | |
| self.config_customize() | ||
| self.config_finalize() | ||
|
|
||
| #TODO perhaps refactor for better encapsulation, moving part to | ||
| # GDCtool.config_initialize() and part to gdc_mirror.config_customize(). | ||
| # Though it is nice to have all the logic for setting datestamp in one place. | ||
| #TODO variable renaming - datestamp_required and datestamp, to make them reflect current usage | ||
| datestamp = self.options.datestamp | ||
| if self.datestamp_required: | ||
| datestamp = self.options.datestamp | ||
| if not datestamp: | ||
| datestamp = 'latest' | ||
| #non-gdc_mirror case | ||
|
|
||
| existing_dates = self.datestamps() # ascending sort order | ||
| if len(existing_dates) == 0: | ||
| raise ValueError("No datestamps found, use upstream tool first") | ||
|
|
||
| if not datestamp: | ||
| #default value = 'latest' | ||
| datestamp = 'latest' | ||
|
|
||
| if datestamp == 'latest': | ||
| datestamp = existing_dates[-1] | ||
| # find last datestamp in existing_dates that is in date format | ||
| for d in reversed(existing_dates): | ||
| if common.DATESTAMP_REGEX.match(d) is not None: | ||
| datestamp = d | ||
| break | ||
| else: | ||
| #TODO make this error message more helpful | ||
| raise ValueError("Looking for latest datestamp, but no datestamps found in correct format") | ||
| elif datestamp not in existing_dates: | ||
| raise ValueError("Requested datestamp not present in " | ||
| + self.config.datestamps + "\n" | ||
| + "Existing datestamps: " + repr(existing_dates)) | ||
| else: | ||
| datestamp = time.strftime('%Y_%m_%d', time.localtime()) | ||
| #gdc_mirror case | ||
| if not datestamp: | ||
| # default value = today's datestamp | ||
| datestamp = common.datestamp() | ||
| elif datestamp == 'pool': | ||
| pass | ||
| else: | ||
| #other strings such as <yyyy-mm-dd>, 'latest', valid variable names, and everything else are not allowed | ||
| raise ValueError("For gdc_mirror, date must be blank or 'pool'") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought the plan was to allow user-defined tags, and that anything besides 'latest' or a datestamp would work like 'pool' There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dropped other user defined tags because they are not needed to meet the requirement. YAGNI. Easy to add later, makes the code simpler and easier to document for today. |
||
|
|
||
|
|
||
| self.datestamp = datestamp | ||
| self.init_logging() | ||
|
|
@@ -98,12 +121,12 @@ def config_add_args(self): | |
| cli = self.cli | ||
| cli.add_argument('--config', nargs='+', type=argparse.FileType('r'), | ||
| help='One or more configuration files') | ||
|
|
||
| if self.datestamp_required: | ||
| cli.add_argument('--date', nargs='?', dest='datestamp', | ||
| help='Use data from a given dated version (snapshot) of ' | ||
| 'GDC data, specified in YYYY_MM_DD form. If omitted, ' | ||
| 'the latest available snapshot will be used.') | ||
| cli.add_argument('--date', nargs='?', dest='datestamp', | ||
| help='Use data from a given dated version (snapshot) of ' | ||
| 'GDC data. The special value "pool" can be used for date ' | ||
| 'agnostic snapshots, typically along with "--append". ' | ||
| 'For tools other than gdc_mirror, you can give a date in YYYY_MM_DD form.' | ||
| 'If omitted, the latest available snapshot will be used.') | ||
| cli.add_argument('--cases', nargs='+', metavar='case_id', | ||
| help='Process data only from these GDC cases') | ||
| cli.add_argument('--categories',nargs='+',metavar='category', | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,7 @@ def dice(self): | |
| program = config.programs[0] | ||
| diced_prog_root = os.path.join(config.dice.dir, program) | ||
| mirror_prog_root = os.path.join(config.mirror.dir, program) | ||
| prog_status_tally = Counter() | ||
|
|
||
| # Ensure no simultaneous mirroring/dicing | ||
| with common.lock_context(diced_prog_root, "dice"), \ | ||
|
|
@@ -150,9 +151,10 @@ def dice(self): | |
| if not os.path.isdir(diced_meta_dir): | ||
| os.makedirs(diced_meta_dir) | ||
| diced_meta_file = os.path.join(diced_meta_dir, diced_meta_fname) | ||
| diced_meta_file_partial = diced_meta_file + '.partial' | ||
|
|
||
| # Count project annotations | ||
| with open(diced_meta_file, 'w') as mf: | ||
| with open(diced_meta_file_partial, 'w') as mf: | ||
| # Header | ||
| META_HEADERS = ['case_id', 'tcga_barcode', 'sample_type', | ||
| 'annotation', 'file_name', 'center', | ||
|
|
@@ -164,17 +166,23 @@ def dice(self): | |
| for tcga_id in tcga_lookup: | ||
| # Dice single sample files first | ||
| for file_d in viewvalues(tcga_lookup[tcga_id]): | ||
| dice_one(file_d, trans_dict, raw_project_root, | ||
| dice_one_status = dice_one(file_d, trans_dict, raw_project_root, | ||
| diced_project_root, mfw, | ||
| dry_run=self.options.dry_run, | ||
| force=self.force) | ||
| prog_status_tally[dice_one_status] += 1 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do like the tallying |
||
|
|
||
| #Then dice the multi_sample_files | ||
| for file_d in multi_sample_files: | ||
| dice_one(file_d, trans_dict, raw_project_root, | ||
| dice_one_status = dice_one(file_d, trans_dict, raw_project_root, | ||
| diced_project_root, mfw, | ||
| dry_run=self.options.dry_run, | ||
| force=self.force) | ||
| prog_status_tally[dice_one_status] += 1 | ||
|
|
||
| # move completed metadata file to final location | ||
| # note it may include diced files that had errors | ||
| os.rename(diced_meta_file_partial, diced_meta_file) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code (and the comment directly above) should not execute when diced files had errors; by definition, the file should remain .partial; this is grounds for not passing review. Seems like all that's needed is: if 'error' not in prog_status_tally:
os.rename(...)There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the code as-is, a failure will cause attempts at running downstream steps to also fail. With your suggested change, the downstream step will pass but silently use the wrong (old) data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for explaining, but I don't understand the logic/idea: a .partial is supposed to indicate a failure state, and the absence of .partial represents a success state. But you seem to be saying the opposite. Downstream steps that observe upstream .partial (failure state) should fail immediately, no? How is the current implementation "detecting failure," then, from an apparently successful state (as indicated by the absence of .partial)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Downstream steps look only for the main output file, not .partial, .error, etc. This respects encapsulation (Also, I was getting dizzy when considering how to correctly and intuitively handle all the permutations of various files existing vs not.) A .partial is currently being used just to ensure that downstream steps never get a file that has been control-C'ed in the middle of. If some files are listed that do not exist, you will get a nonzero exit code, and the next step will also fail with a nonzero exit code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, in the case cited here, the metadata.partial is just atomicity for file-level operations, so we know if catastrophic event occurred (like ran out of disk space, or CTRL-C)? Not quite sure what you're getting at about how not looking for .partial "respects encapsulation"? But earlier you said by removing .partial ONLY when all files have diced causes downstream steps to "pass" by silently using wrong (old) data? But, if the only data that actually passed through the dicer is "old" then isn't that the correct thing to do? Can you explain what a downstream failure looks like, then, if we keep the code "as is"? Does the gdc_loadfile (a) refuse to launch at all OR (b) launch and process everything it can, but skip the files that failed to dice (and flag them with warning messages, etc)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, allowing the loadfile to be created when the underlying data is not there makes the entire situation worse: because it will not be caught when gsutil is used to push data into cloud (because it acts like rsync, and doesn't know what should or shouldn't be there), BUT rather will cause the merge pipelines to mysteriously fail (and this is vastly harder situation to debug). Moreover, each of the tools output what version of data they're operating upon, e.g. from dicer log so I don't see the problem with charging ahead on old data--one is clearly notified-- and if one chooses to not pay attention to that notification then ... well, one is not being a careful analyst. But charging ahead on old data would be prevented entirely by using .error (or .partial) on all files (data and metadata/json). In the sense that if gdc_loadfile sees gdc_dice.error then it complains and stops. If gdc_dice sees gdc_mirror.error it complains and stops. etc. What am I missing here? You want to avoid users making inadvertent mistakes, and this is a way to help that...right? Only way to remedy a .error file is to remove it and attempt to re-do that step which errored. And if you don't want to clear the error then your only option is to manually specify an older datestamp (or tag), in which case you are not confused about what data is being used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This failing case would be better if gdc_loadfile checked for the existence of every file before moving forward, but that would make the common (passing) case much slower. In the end it is a matter of tradeoffs. Streamline and bulletproof the use cases you care about most, and for the less central use cases try not to unpleasantly surprise the user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may be talking past one another, or there are (unstated?) assumptions lurking in our perspectives. In my view, if--when an error occurs--gdc_dice were to output metadata.json.error instead of metadata.json then downstream tools would not have to do anything at all, they just exit immediately. You seem to be opposed to this, and suggest that it's "better" to check every file, but I don't understand why? Because if there is no .error suffix then gdc_loadfile should not have to check any files, right? The absence of .error from gdc_dice is the contract to gdc_loadfile that everything within the metadata.json indeed exists ... as gdc_dice will have already iterated over the entire suite of files (by necessity). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If you address issue 1, making the system logic sufficiently reliable for your satisfaction, I have no real problem with using .error files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you, Gordon. Using 'date' as synonymous with 'tag' ...
|
||
|
|
||
| # Bookkeeping code -- write some useful tables | ||
| # and figures needed for downstream sample reports. | ||
|
|
@@ -217,7 +225,13 @@ def dice(self): | |
| _write_combined_counts(all_counts_file, all_counts, all_totals) | ||
| _link_to_prog(all_counts_file, datestamp, diced_prog_root) | ||
|
|
||
| logging.info("Dicing completed successfuly") | ||
| logging.info(str(prog_status_tally)) | ||
| if prog_status_tally['error'] == 0: | ||
| logging.info("Dicing completed successfuly") | ||
| else: | ||
| logging.error("One or more diced files FAILED") | ||
| raise Exception("One or more diced files FAILED") | ||
|
|
||
|
|
||
| def execute(self): | ||
| super(gdc_dice, self).execute() | ||
|
|
@@ -361,10 +375,11 @@ def dice_one(file_dict, translation_dict, mirror_proj_root, diced_root, | |
| true, a debug message will be displayed instead of performing the actual | ||
| dicing operation. | ||
| """ | ||
| dice_one_status = 'error' | ||
| mirror_path = meta.mirror_path(mirror_proj_root, file_dict) | ||
| if not os.path.isfile(mirror_path): | ||
| # Bad, this means there are integrity issues | ||
| raise ValueError("Expected mirror file missing: " + mirror_path) | ||
| logging.warning("Expected mirror file missing: " + mirror_path) | ||
| else: | ||
| ## Get the right annotation and converter for this file | ||
| annot, convert = get_annotation_converter(file_dict, translation_dict) | ||
|
|
@@ -380,32 +395,49 @@ def dice_one(file_dict, translation_dict, mirror_proj_root, diced_root, | |
| already_diced = all(os.path.isfile(p) for p in expected_paths) | ||
| if force or not already_diced: | ||
| logging.info("Dicing file " + mirror_path) | ||
| convert(file_dict, mirror_path, dice_path) | ||
| try: | ||
| convert(file_dict, mirror_path, dice_path) | ||
| dice_one_status = 'pass' | ||
| except Exception as e: | ||
| logging.warning('Dice converter failed: %s'%str(e)) | ||
| else: | ||
| logging.info("Skipping file " + mirror_path + " (already diced)") | ||
| dice_one_status = 'cached' | ||
|
|
||
| append_diced_metadata(file_dict, expected_paths, | ||
| duplicate_detected = append_diced_metadata(file_dict, expected_paths, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to return the opposite value of the spirit of the call: by returning True for the bad condition when the append failed (due to having a duplicate); this prevents the code from passing review |
||
| annot, meta_file_writer) | ||
| else: | ||
| # To verbose to log the entire json, log just log data_type and file_id | ||
| warning_info = { | ||
| 'data_type' : file_dict["data_type"], | ||
| 'data_category' : file_dict["data_category"], | ||
| 'file_id' : file_dict["file_id"], | ||
| 'file_name': file_dict['file_name'] | ||
| } | ||
| logging.warn('Unrecognized data:\n%s' % json.dumps(warning_info, | ||
| indent=2)) | ||
| if duplicate_detected: | ||
| dice_one_status = 'error' | ||
| else: | ||
| dice_one_status = 'dry_run' | ||
|
|
||
| return dice_one_status | ||
|
|
||
| def get_annotation_converter(file_dict, translation_dict): | ||
| k = metadata_to_key(file_dict) | ||
| dictkey = metadata_to_dictkey(file_dict) | ||
| k = frozenset(dictkey.items()) | ||
| if k in translation_dict: | ||
| return translation_dict[k] | ||
| else: | ||
| # FIXME: Gracefully handle this instead of creating a new annotation type | ||
| warning_info = \ | ||
| """ | ||
| file_id: {0} | ||
| file_name: {1} | ||
|
|
||
| data_category: {data_category} | ||
| data_type: {data_type} | ||
| experimental_strategy: {experimental_strategy} | ||
| platform: {platform} | ||
| center_namespace: {center_namespace} | ||
| tags: {tags} | ||
| workflow_type: {workflow_type} | ||
| """.format(file_dict["file_id"], file_dict['file_name'], **dictkey) | ||
|
|
||
| logging.warn('Unrecognized data:\n%s' % warning_info) | ||
|
|
||
| return "UNRECOGNIZED", None | ||
|
|
||
| def metadata_to_key(file_dict): | ||
| def metadata_to_dictkey(file_dict): | ||
| """Converts the file metadata in file_dict into a key in the TRANSLATION_DICT""" | ||
| # Required fields | ||
| data_type = file_dict.get("data_type", '') | ||
|
|
@@ -416,15 +448,18 @@ def metadata_to_key(file_dict): | |
| center_namespace = file_dict['center']['namespace'] if 'center' in file_dict else '' | ||
| workflow_type = file_dict['analysis']['workflow_type'] if 'analysis' in file_dict else '' | ||
|
|
||
| return frozenset({ | ||
| return { | ||
| "data_type" : data_type, | ||
| "data_category" : data_category, | ||
| "experimental_strategy" : experimental_strategy, | ||
| "platform" : platform, | ||
| "tags" : tags, | ||
| "center_namespace" : center_namespace, | ||
| "workflow_type" : workflow_type | ||
| }.items()) | ||
| } | ||
|
|
||
| # globally scoped variable, for persistence | ||
| count_by_tcga_barcode_by_annot = defaultdict(Counter) | ||
|
|
||
| def append_diced_metadata(file_dict, diced_paths, annot, meta_file_writer): | ||
| '''Write one or more rows for the given file_dict using meta_file_writer. | ||
|
|
@@ -433,7 +468,8 @@ def append_diced_metadata(file_dict, diced_paths, annot, meta_file_writer): | |
|
|
||
| meta_file_writer must be a csv.DictWriter | ||
| ''' | ||
|
|
||
|
|
||
| duplicate_detected = False | ||
| # These fields will be shared regardless of the number of diced files | ||
| rowdict = { | ||
| 'annotation' : annot, | ||
|
|
@@ -448,17 +484,24 @@ def append_diced_metadata(file_dict, diced_paths, annot, meta_file_writer): | |
| sample_type = None | ||
| if meta.has_sample(file_dict): | ||
| sample_type = meta.sample_type(file_dict) | ||
| tcga_barcode = meta.tcga_id(file_dict) | ||
|
|
||
|
|
||
| # Write row with csv.DictWriter.writerow() | ||
| rowdict.update({ | ||
| 'case_id' : meta.case_id(file_dict), | ||
| 'tcga_barcode' : meta.tcga_id(file_dict), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recognize that this is old-ish code, but it should nevertheless aim to be agnostic to project, rather than explicitly referencing TCGA |
||
| 'tcga_barcode' : tcga_barcode, | ||
| 'sample_type' : sample_type, | ||
| 'file_name' : diced_path, | ||
| 'is_ffpe' : meta.is_ffpe(file_dict) | ||
| }) | ||
|
|
||
| meta_file_writer.writerow(rowdict) | ||
|
|
||
| count_by_tcga_barcode_by_annot[tcga_barcode][annot] += 1 | ||
| if count_by_tcga_barcode_by_annot[tcga_barcode][annot] > 1: | ||
| duplicate_detected = True | ||
| logging.error('Multiple files contain data for the same sample: %s'%str(rowdict) ) | ||
|
|
||
| else: | ||
| # Harder case, have to write a line for each unique file | ||
| # We need to match the diced filenames back to the original samples | ||
|
|
@@ -484,6 +527,14 @@ def append_diced_metadata(file_dict, diced_paths, annot, meta_file_writer): | |
| }) | ||
| meta_file_writer.writerow(rowdict) | ||
|
|
||
| count_by_tcga_barcode_by_annot[tcga_barcode][annot] += 1 | ||
| if count_by_tcga_barcode_by_annot[tcga_barcode][annot] > 1: | ||
| duplicate_detected = True | ||
| logging.error('Multiple files contain data for the same sample: %s'%str(rowdict) ) | ||
|
|
||
|
|
||
| return duplicate_detected | ||
|
|
||
| def constrain(metadata, config): | ||
| cases_chosen = set(config.cases) | ||
| categories_chosen = config.categories | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually means that there are no dates whatsoever