forked from squared9/Crime_Detection
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerator.py
More file actions
113 lines (101 loc) · 5.18 KB
/
generator.py
File metadata and controls
113 lines (101 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import numpy as np
from keras.utils import Sequence
from coordinates import normalize_coordinates
from augmentation import augment_coordinates
from configuration import coordinate_encoding, number_of_coordinates, number_of_frames, NORMALIZE_ASPECT_RATIO, NORMALIZE_COORDINATES
from coordinates import convert_coordinates, normalize_coordinates
from augmentation import augment_coordinates
class Single_Track_Generator(Sequence):
def __init__(self, sequences, sequence_boundaries, track_dimensions, number_of_frames, batch_size=1,
randomized=True, randomize_positions=False, mean_scale_factor=1.0, p_horizontal_flip=0.5):
"""
Initializes generator
Arguments:
sequences -- training sequences dictionary, sequence_id -> [(coordinates, bounding box) for each step]
sequence_boundaries -- dictionary with bounding boxes of whole sequences, sequence_id -> bounding box
number_of_frames -- number of frames for a single training sample, e.g. 300 frames
batch_size -- training batch size
randomized -- if true, order of sequences is randomized in each epoch
"""
self.sequences = sequences
self.sequence_boundaries = sequence_boundaries
self.track_dimensions = track_dimensions
self.number_of_frames = number_of_frames
# TODO: allow/test batch_size > 1
self.batch_size = batch_size
self.ids = list(sequences.keys())
# self.dimension = len(sequences[self.ids[0]][0][0]) // 2 # length of coordinates
self.dimension = number_of_coordinates
self.randomized = randomized
if self.randomized:
self.ids = np.random.permutation(self.ids)
self.randomize_positions = randomize_positions
self.mean_scale_factor = mean_scale_factor
self.p_horizontal_flip = p_horizontal_flip
self.index = 0
self.length = None
self.steps = {} # index to sequence_id + offset for each training step
def __len__(self):
"""
Returns number of batches during a single full pass
"""
offset = 0
if self.length is None:
self.length = 0
for id in self.ids:
number_of_subsequences = len(self.sequences[id]) - self.number_of_frames + 1
self.length += number_of_subsequences
# for each possible subsequence of a sequence add a new (id, starting step) record
# to allow random batch retrieval with arbitrary size
for i in range(number_of_subsequences):
self.steps[offset] = (id, i) # (sequence_id, offset)
offset += 1
self.length = np.floor(self.length / float(self.batch_size)) # skip last incomplete batch
self.length = int(self.length)
return self.length
def __getitem__(self, idx):
"""
Receives a new training batch with index idx
"""
x = np.ndarray((self.batch_size, self.number_of_frames, 2, self.dimension), dtype=np.float32)
index = idx * self.batch_size
for j in range(self.batch_size):
for i in range(self.number_of_frames):
sequence_id, step = self.steps[index]
coordinates = self.sequences[sequence_id][step][0][:]
aspect_ratio = 1.0
if NORMALIZE_ASPECT_RATIO:
aspect_ratio = self.track_dimensions[sequence_id][2]
coordinates = self.perform_augmentation(sequence_id, coordinates)
coordinates = convert_coordinates(coordinates, coordinate_encoding, aspect_ratio)
if NORMALIZE_COORDINATES:
coordinates = normalize_coordinates(coordinates,
coordinate_encoding,
self.sequence_boundaries[sequence_id],
self.track_dimensions[sequence_id],
prefer_track_dimension=True,
)
x[j, i, :, :] = np.reshape(coordinates, (2, self.dimension))
return np.array([x, x])
def perform_augmentation(self, sequence_id, coordinates):
"""
Augments coordinates for training
Arguments:
coordinates -- all x coordinates followed by all y coordinates
"""
sequence_boundary = self.sequence_boundaries[sequence_id]
result = augment_coordinates(coordinates,
randomize_positions=self.randomize_positions,
mean_scale_factor=self.mean_scale_factor,
p_horizontal_flip=self.p_horizontal_flip
)
return result
def on_epoch_end(self):
"""
Called on epoch end. If randomized is set to true, it shuffles sequence IDs for the next pass
"""
if self.randomized:
self.ids = np.random.permutation(self.ids)
self.index = 0
self.length = None
self.__len__()