-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhi_segmented_pipeline.py
229 lines (193 loc) · 10.5 KB
/
hi_segmented_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python
import sys
import os
import filecmp
import logging
import time
import numpy
import shutil
import readline
import configparser
from ast import literal_eval
import glob
import collections
from ruffus import *
import cgatcore.experiment as E
from cgatcore import pipeline as P
def input_validation():
"""
Auxiliary function to make sure input configuration is valid
"""
# check whether 'configfile' was specified in pipeline.yml
if 'configfile' not in cgatcore_params:
raise RuntimeError(' Please specify a configfile in pipeline.yml ')
# check whether configfile is readable
if not os.access(cgatcore_params['configfile'], os.R_OK):
raise FileNotFoundError(' Configuration file is required but not found.')
# check casa path exists
if not os.access(cgatcore_params['casa'] + '/casa', os.R_OK):
raise FileNotFoundError(' CASA is required but not found. Check path in yml file.')
# check project ID exists
if cgatcore_params['project'] == '':
raise ValueError(' Project ID is required but not found.')
def backup_pipeline_params():
"""
Creates (overwrites) a copy of the pipeline parameters file.
"""
backup_file = 'backup.'+cgatcore_params['configfile']
shutil.copyfile(cgatcore_params['configfile'],backup_file)
def check_pipeline_params():
"""
Checks if any pipeline parameters have been modified since the previous run.
"""
configfile = cgatcore_params['configfile']
backup_file = 'backup.'+configfile
if os.access(backup_file, os.R_OK):
if not filecmp.cmp(backup_file,configfile):
print('Parameters in {} have been modified since the last execution.'.format(configfile))
bash_command = 'diff {0} {1} > diff_params.txt'.format(backup_file,configfile)
os.system(bash_command)
f = open('diff_params.txt','r')
diff = f.read()
f.close()
os.remove('diff_params.txt')
import_data_kwds = ['project_name','data_path','jvla','mstransform','keep_','hanning', 'chanavg']
flag_calib_split_kwds = ['src_dir','shadow_tol','quack_int','timecutoff','freqcutoff','rthresh','refant','no_rflag','no_tfcrop',
'fluxcal','fluxmod','man_mod','bandcal','phasecal','targets','target_names','mosaic','man_comb_spws']
dirty_cont_image_kwds = ['rest_freq','img_dir']
contsub_dirty_image_kwds = ['linefree_ch','fitorder','save_cont','line_ch']
clean_image_kwds = ['automask','multiscale','beam_scales','sefd','corr_eff','robust','thresh','pix_size','im_size','noise','phasecenter']
moment_kwds = ['mom_thresh','mom_chans']
cleanup_kwds = ['cleanup_level']
if any(keyword in diff for keyword in import_data_kwds):
for keyword in import_data_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('import_data'))
try:
os.remove('import_data.done')
except FileNotFoundError:
pass
elif any(keyword in diff for keyword in flag_calib_split_kwds):
for keyword in flag_calib_split_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('flag_calib_split'))
try:
os.remove('flag_calib_split.done')
except FileNotFoundError:
pass
elif any(keyword in diff for keyword in dirty_cont_image_kwds):
for keyword in dirty_cont_image_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('dirty_cont_image'))
try:
os.remove('dirty_cont_image.done')
except FileNotFoundError:
pass
elif any(keyword in diff for keyword in contsub_dirty_image_kwds):
for keyword in contsub_dirty_image_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('contsub_dirty_image'))
try:
os.remove('contsub_dirty_image.done')
except FileNotFoundError:
pass
elif any(keyword in diff for keyword in clean_image_kwds):
for keyword in clean_image_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('clean_image'))
try:
os.remove('clean_image.done')
except FileNotFoundError:
pass
elif any(keyword in diff for keyword in moment_kwds):
for keyword in moment_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('moment_zero'))
try:
os.remove('moment_zero.done')
except FileNotFoundError:
pass
elif any(keyword in diff for keyword in cleanup_kwds):
for keyword in cleanup_kwds:
if keyword in diff:
print('The {} keyword value has changed in the parameters file since the previous run.'.format(keyword))
print('Steps from {} onwards will be marked as incomplete.'.format('cleanup'))
try:
os.remove('cleanup.done')
except FileNotFoundError:
pass
else:
print('No parameters backup file found.')
print('Skipping parameters change check.')
# Read cgat-core configuration
cgatcore_params = P.get_parameters("hi_segmented_pipeline.yml")
# Sanity checks on input parameters
input_validation()
#Review and backup pipeline parameters
check_pipeline_params()
backup_pipeline_params()
# Add CASA to the PATH
os.environ["PATH"] += os.pathsep + cgatcore_params['casa']
# deactivate cgat-core logging to stdout
# cgat-core logs were sent to both stdout and pipeline.log
# to-do: we want to have it enable only for pipeline.log
logging.getLogger("cgatcore.pipeline").disabled = False
logging.getLogger("cgatcore").disabled = False
# start of the cgat-core pipeline
@originate('dependency_check.done')
def dependency_check(outfile):
"""
Check required dependencies to run the pipeline
"""
deps = ["casa"]
for cmd in deps:
if shutil.which(cmd) is None:
raise EnvironmentError("Required dependency \"{}\" not found".format(cmd))
scripts = ['import_data','flag_calib_split','dirty_cont_image','contsub_dirty_image','clean_image','cleanup','common_functions','moment_zero']
for script in scripts:
if not os.access(script+'.py', os.R_OK):
os.symlink(cgatcore_params['scripts']+script+'.py',script+'.py')
if not os.access(script+'.py', os.R_OK):
os.unlink(script+'.py')
raise FileNotFoundError('{0}.py file is required but not found in the scripts directory ({1}).'.format(script,cgatcore_params['scripts']))
open(outfile, 'a').close()
@transform(dependency_check, suffix('dependency_check.done'), 'import_data.done'.format(cgatcore_params['project']))
def import_data(infile,outfile):
statement = 'casa --nologger -c import_data.py {} && touch import_data.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
@transform(import_data, suffix('import_data.done'.format(cgatcore_params['project'])), 'flag_calib_split.done'.format(cgatcore_params['project']))
def flag_calib_split(infile,outfile):
statement = 'casa --nologger -c flag_calib_split.py {} && touch flag_calib_split.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
@transform(flag_calib_split, suffix('flag_calib_split.done'.format(cgatcore_params['project'])), 'dirty_cont_image.done'.format(cgatcore_params['project']))
def dirty_cont_image(infile,outfile):
statement = 'casa --nologger -c dirty_cont_image.py {} && touch dirty_cont_image.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
@transform(dirty_cont_image, suffix('dirty_cont_image.done'.format(cgatcore_params['project'])), 'contsub_dirty_image.done'.format(cgatcore_params['project']))
def contsub_dirty_image(infile,outfile):
statement = 'casa --nologger -c contsub_dirty_image.py {} && touch contsub_dirty_image.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
@transform(contsub_dirty_image, suffix('contsub_dirty_image.done'.format(cgatcore_params['project'])), 'clean_image.done'.format(cgatcore_params['project']))
def clean_image(infile,outfile):
statement = 'casa --nologger -c clean_image.py {} && touch clean_image.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
@transform(clean_image, suffix('clean_image.done'.format(cgatcore_params['project'])), 'moment_zero.done'.format(cgatcore_params['project']))
def moment_zero(infile,outfile):
statement = 'casa --nologger -c moment_zero.py {} && touch moment_zero.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
@transform(moment_zero, suffix('moment_zero.done'.format(cgatcore_params['project'])), 'cleanup.done'.format(cgatcore_params['project']))
def cleanup(infile,outfile):
statement = 'casa --nologger -c cleanup.py {} && touch cleanup.done'.format(cgatcore_params['configfile'])
stdout, stderr = P.execute(statement)
def main(argv=None):
if argv is None:
argv = sys.argv
P.main(argv)
if __name__ == "__main__":
sys.exit(P.main(sys.argv))