-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathzip.py
More file actions
162 lines (127 loc) · 4.61 KB
/
Copy pathzip.py
File metadata and controls
162 lines (127 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python
"""
Zip all converted/download/cleaned light curves of one or more SNe into a single file or multiple files.
@author: Sofia Rest
"""
import argparse
import os
import sys
from typing import Dict, List
import zipfile
from download import load_config
ALLOWED_EXTENSIONS = {
".txt",
".pdf",
".jpg",
".png",
".md",
".ipynb",
".py",
".json",
".ini",
}
def define_args(parser=None, usage=None, conflict_handler="resolve"):
if parser is None:
parser = argparse.ArgumentParser(usage=usage, conflict_handler=conflict_handler)
parser.add_argument(
"tnsnames", nargs="+", help="TNS names of the objects to download from ATLAS"
)
parser.add_argument(
"--config_file",
default="config.ini",
type=str,
help="file name of .ini file with settings for this class",
)
parser.add_argument(
"-b",
"--bulk",
default=False,
action="store_true",
help="store multiple SN files within one zip file",
)
return parser
def get_in_dirnames(tnsname: str, input_dir: str, output_dir: str) -> List[str]:
"""
Get directories to zip for a single SN by its TNS name and the ATLAS input and output directories.
"""
return [f"{input_dir}/{tnsname}", f"{output_dir}/{tnsname}"]
def get_out_filename(tnsname, output_dir) -> str:
"""
Get the file name of the output zip file.
"""
return f"{output_dir}/{tnsname}.zip"
def is_file_allowed(filename: str) -> bool:
return any(filename.endswith(extension) for extension in ALLOWED_EXTENSIONS)
def get_files_from(dirname) -> Dict[str, str]:
all_files = {}
directory_name = os.path.basename(dirname)
for root, _, files in os.walk(dirname):
for file in files:
original_path = os.path.join(root, file)
relative_path = os.path.join(
directory_name, os.path.relpath(original_path, start=dirname)
)
all_files[original_path] = relative_path
return all_files
def get_allowed_files_from(dirname) -> List[str]:
return list(
filter(
lambda x: is_file_allowed(x)
and not os.path.isdir(os.path.join(dirname, x)),
os.listdir(dirname),
)
)
def new_zipfile(out_filename) -> zipfile.ZipFile:
return zipfile.ZipFile(out_filename, mode="a", compression=zipfile.ZIP_DEFLATED)
def zip_directory(zf: zipfile.ZipFile, in_dirnames: List[str]):
for in_dirname in in_dirnames:
filestozip = get_files_from(in_dirname)
for original_path, relative_path in filestozip.items():
if not is_file_allowed(relative_path):
print(f"# Skipping {original_path}")
continue
zf.write(original_path, relative_path)
return zf
def zip_sne_in_bulk(
tnsnames: List[str], input_dir: str, output_dir: str, out_filename: str
):
check_that_out_zipfile_does_not_exist(out_filename)
zf = new_zipfile(out_filename)
for i in range(0, len(tnsnames)):
print(f"\n📦 Zipping {tnsnames[i]} into {out_filename}...")
in_dirnames = get_in_dirnames(tnsnames[i], input_dir, output_dir)
zf = zip_directory(zf, in_dirnames)
zf.close()
print("✅ Success")
def zip_single_sn(tnsname: str, input_dir: str, output_dir: str, out_filename: str):
print(f"\n📦 Zipping {tnsname} into {out_filename}...")
check_that_out_zipfile_does_not_exist(out_filename)
in_dirnames = get_in_dirnames(tnsname, input_dir, output_dir)
zf = new_zipfile(out_filename)
zf = zip_directory(zf, in_dirnames)
zf.close()
print("✅ Success")
def check_that_out_zipfile_does_not_exist(out_filename: str):
if os.path.exists(out_filename):
raise ValueError(
f"Error: zip file '{out_filename}' already exists. Please delete or rename it first."
)
if __name__ == "__main__":
args = define_args().parse_args()
tnsnames = args.tnsnames
bulk = args.bulk
print(
f"List of transients to zip {'in bulk' if bulk else 'individually'}: {tnsnames}"
)
config = load_config(args.config_file)
input_dir = config["dir"]["atclean_input"]
output_dir = config["dir"]["output"]
print(f"📁 ATClean input directory: {input_dir}")
print(f"📁 Output directory: {output_dir}")
if bulk:
out_filename = f"{output_dir}/lcs.zip"
zip_sne_in_bulk(tnsnames, input_dir, output_dir, out_filename)
else:
for i in range(len(tnsnames)):
out_filename = get_out_filename(tnsnames[i], output_dir)
zip_single_sn(tnsnames[i], input_dir, output_dir, out_filename)