-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnemaquant.py
executable file
·246 lines (235 loc) · 12.6 KB
/
nemaquant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python
# coding: utf-8
import argparse
import numpy as np
import pandas as pd
import cv2
import os
from pathlib import Path
from ultralytics import YOLO
from glob import glob
import re
def options():
parser = argparse.ArgumentParser(description="Nematode egg image processing with YOLOv8 model.")
parser.add_argument("-i", "--img", help="Target image directory or image (REQUIRED)", required=True)
parser.add_argument('-w', '--weights', help='Weights file for use with YOLO11 model')
parser.add_argument("-o","--output", help="Name of results file. If no file is specified, one will be created from the key file name")
parser.add_argument("-k", "--key", help="CSV key file to use as output template. If no file is specified, will look for one in target directory. Not used in single-image mode")
parser.add_argument("-a","--annotated", help="Directory to save annotated image files", required=False)
parser.add_argument("--conf", help="Confidence cutoff (default = 0.6)", default=0.6, type=float)
args = parser.parse_args()
return args
# TODO - maybe rework this from a function to custom argparse.Action() subclasses?
def check_args():
args = options()
# basic checks on target file validity
args.imgpath = Path(args.img)
if not args.imgpath.exists():
raise Exception("Target %s is not a valid path" % args.img)
if args.imgpath.is_file():
args.img_mode = 'file'
if not args.imgpath.suffix.lower() in ['.tif','.tiff','.jpg','.jpeg','.png']:
raise Exception('Target image %s must of type .png, .tif, .tiff, .jpeg, or .jpg' % args.img)
elif args.imgpath.is_dir():
args.img_mode = 'dir'
else:
raise Exception('Target %s does not appear to be a file or directory.' % args.img)
# if no weights file, try using the default weights.pt
if not args.weights:
script_dir = Path(__file__).parent
default_weights = script_dir / 'weights.pt'
if default_weights.exists():
args.weights = str(default_weights)
else:
raise Exception('No weights file specified and default weights.pt not found in script directory')
# check if subdirectories of format XY00/ exist or if we're running on just a dir of images
if args.img_mode == 'dir':
subdirs = sorted(list(args.imgpath.glob('XY[0-9][0-9]/')))
if len(subdirs) == 0:
print("No subdirectories of format /XY../ found in specified imgdir, checking for images...")
potential_images = [x for x in args.imgpath.iterdir() if x.suffix.lower() in ['.tif','.tiff','.jpg','.jpeg','.png']]
if len(potential_images) == 0:
raise Exception('No valid images (.png, .tif, .tiff, .jpeg, .jpg) in target folder %s' % args.img)
else:
print('%s valid images found' % len(potential_images))
args.xy_mode = False
args.subimage_paths = potential_images
else:
args.xy_mode = True
args.subdir_paths = subdirs
# for /XY00/ subdirectories, we require a valid key
# ensure that either a key is specified, or if a single .csv exists in the target dir, use that
if args.xy_mode:
if args.key:
args.keypath = Path(args.key)
if not args.keypath.exists():
raise Exception('Specified key file does not exist: %s' % args.keypath)
if args.keypath.suffix != '.csv':
raise Exception("Specified key file is not a .csv: %s" % args.keypath)
else:
print('Running on /XY00/ subdirectories but no key specified. Looking for key file...')
potential_keys = list(args.imgpath.glob('*.csv'))
if len(potential_keys) == 0:
raise Exception("No .csv files found in target folder %s, please check directory" % args.img)
if len(potential_keys) > 1:
raise Exception("Multiple .csv files found in target folder %s, please specify which one to use")
else:
args.keypath = potential_keys[0]
args.key = str(potential_keys[0])
# if path to results file is specified, ensure it is .csv
if args.output:
args.outpath = Path(args.output)
if args.outpath.suffix != '.csv':
raise Exception("Specified output file is not a .csv: %s" % args.outpath)
else:
# for XY00 subdirs, name it after the required key file
# for an image directory, name it after the directory
if args.xy_mode:
args.output = '%s_eggcounts.csv' % args.keypath.stem
else:
args.output = '%s_eggcounts.csv' % args.imgpath.stem
args.outpath = Path(args.output)
# finally, check the target dir to save annotated images in
if args.annotated:
args.annotpath = Path(args.annotated)
if not args.annotpath.exists():
os.mkdir(args.annotated)
elif not args.annotpath.is_dir():
raise Exception("annotated output folder is not a valid directory: %s" % args.annotated)
return args
# parse a key file, make sure it all looks correct and can be merged later
def parse_key_file(keypath):
key = pd.read_csv(keypath)
# drop potential Unnamed: 0 column if rownames from R were included without col header
key = key.loc[:, ~key.columns.str.contains('^Unnamed')]
# for now, will only allow 96-row key files
# can handle edge cases, but much easier if we just require 96
if key.shape[0] > 96:
raise Exception("More than 96 rows found in key. Please check formatting and try again")
# check if it's got at least one column formatted with what looks like plate positions
well_columns = []
for col in key.columns:
if key[col].dtype.kind == "O":
if all(key[col].str.fullmatch("[A-H][0-9]{1,2}")):
well_columns.append(col)
if len(well_columns) == 0:
raise Exception("No column found with well positions of format A1/A01/H12/etc.")
elif len(well_columns) > 1:
raise Exception("Multiple columns found with well positions of format A1/A01/H12/etc.")
# add a column named keycol, formatted to match the folder output like _A01
key["keycol"] = key[well_columns[0]]
# as the key, it should really be unique and complete, raise exception if not the case
if any(key["keycol"].isna()):
raise Exception("There appear to be blank well positions in column %s. Please fix and resubmit." % well_columns[0])
if len(set(key["keycol"])) < len(key["keycol"]):
raise Exception("There appear to be duplicated well positions in the key file. Please fix and resubmit.")
# if formatted A1, reformat as A01
key["keycol"] = key["keycol"].apply(lambda x: "_%s%s" % (re.findall("[A-H]",x)[0], re.findall("[0-9]+", x)[0].zfill(2)))
return key
def main():
args = check_args()
if args.key:
key = parse_key_file(str(args.keypath))
model = YOLO(args.weights)
# create a couple empty lists for holding results, easier than adding to empty Pandas DF
tmp_well = []
tmp_numeggs = []
tmp_filenames = []
# single-image mode
if args.img_mode == 'file':
img = cv2.imread(str(args.imgpath))
results = model.predict(img, imgsz = 1440, max_det=1000, verbose=False, conf=args.conf)
result = results[0]
box_classes = [result.names[int(x)] for x in result.boxes.cls]
# NOTE - filtering by class is not necessary, but would make this easier to extend to multi-class models
# e.g. if we want to add hatched, empty eggs, etc
egg_xy = [x.numpy().astype(np.int32) for i,x in enumerate(result.boxes.xyxy) if box_classes[i] == 'egg']
print('Target image:\n%s' % str(args.imgpath))
print('n eggs:\n%s' % len(egg_xy))
if args.annotated:
annot = img.copy()
for xy in egg_xy:
cv2.rectangle(annot, tuple(xy[0:2]), tuple(xy[2:4]), (0,0,255), 4)
annot_path = args.annotpath / ('%s_annotated%s' % (args.imgpath.stem, args.imgpath.suffix))
cv2.imwrite(str(annot_path), annot)
print('Saving annotations to %s...' % str(annot_path))
# multi-image mode, runs differently depending on whether you have /XY00/ subdirectories
elif args.img_mode == 'dir':
if args.xy_mode:
for subdir in args.subdir_paths:
# check that the empty file with well name is present
well = [x.name for x in subdir.iterdir() if re.match("_[A-H][0-9]{1,2}", x.name)][0]
if len(well) == 0:
raise Exception("No well position file of format _A01 found in subdirectory:\n%s" % subdir)
# print the XY subdirectory name for tracking purposes
xy = subdir.name
print(xy)
# search for a filename with CH4 in it
# TODO - confirm with sweetpotato group that the CH4.tif or CH4.jpg will be present in all cases
candidate_img_paths = list(subdir.glob('*CH4*'))
# if none or more than one, just skip the folder vs raise exceptions
if len(candidate_img_paths) == 0:
print("No CH4 image found for subdirectory %s" % subdir)
continue
elif len(candidate_img_paths) > 1:
print("Multiple CH4 images found in subdirectory %s" % subdir)
continue
impath = candidate_img_paths[0]
# get the actual output
img = cv2.imread(str(impath))
results = model.predict(img, imgsz = 1440, verbose=False, conf=args.conf)
result = results[0]
box_classes = [result.names[int(x)] for x in result.boxes.cls]
egg_xy = [x.numpy().astype(np.int32) for i,x in enumerate(result.boxes.xyxy) if box_classes[i] == 'egg']
# append relevant output to temporary lists
tmp_well.append(well)
tmp_numeggs.append(len(egg_xy))
tmp_filenames.append(impath.name)
# annotate and save image if needed
if args.annotated:
annot = img.copy()
for xy in egg_xy:
cv2.rectangle(annot, tuple(xy[0:2]), tuple(xy[2:4]), (0,0,255), 4)
annot_path = args.annotpath / ('%s_annotated%s' % (impath.stem, impath.suffix))
cv2.imwrite(str(annot_path), annot)
# make a CSV to merge with the key
results = pd.DataFrame({
"keycol": tmp_well,
"num_eggs": tmp_numeggs,
"filename": tmp_filenames,
"folder": args.img})
# merge and save
outdf = key.merge(results, on = "keycol", how = "left")
outdf = outdf.drop("keycol", axis = 1)
else:
# apply the model on each image
# running model() on the target dir instead of image-by-image would be cleaner
# but makes saving annotated images more complicated
# can maybe revisit later
for impath in sorted(args.subimage_paths):
img = cv2.imread(str(impath))
results = model.predict(img, imgsz = 1440, verbose=False, conf= args.conf)
result = results[0]
box_classes = [result.names[int(x)] for x in result.boxes.cls]
egg_xy = [x.numpy().astype(np.int32) for i,x in enumerate(result.boxes.xyxy) if box_classes[i] == 'egg']
tmp_numeggs.append(len(egg_xy))
tmp_filenames.append(impath.name)
# annotate if needed
if args.annotated:
annot = img.copy()
for xy in egg_xy:
cv2.rectangle(annot, tuple(xy[0:2]), tuple(xy[2:4]), (0,0,255), 4)
annot_path = args.annotpath / ('%s_annotated%s' % (impath.stem, impath.suffix))
cv2.imwrite(str(annot_path), annot)
outdf = pd.DataFrame({
'folder': args.imgpath,
"filename": tmp_filenames,
"num_eggs": tmp_numeggs})
# save final pandas df, print some updates for user
outdf.sort_values(by='filename', inplace=True)
outdf.to_csv(str(args.outpath), index=False)
print('Saving output to %s...' % str(args.outpath))
if args.annotated:
print('Saving annotated images to %s...' % str(args.annotpath))
if __name__ == '__main__':
main()