forked from wukui-muc/Offline_RL_Active_Tracking
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataCollection.py
executable file
·138 lines (118 loc) · 5.34 KB
/
DataCollection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import argparse
import gym_unrealcv
import gym
from gym import wrappers
import cv2
import time
import numpy as np
import os
import torch
from gym_unrealcv.envs.tracking.baseline import PoseTracker
from gym_unrealcv.envs.wrappers import time_dilation, early_done, monitor, agents, augmentation, configUE
import random
from collections import defaultdict
class RandomAgent(object):
"""The world's simplest agent!"""
def __init__(self, action_space):
self.action_space = action_space
self.count_steps = 0
self.action = self.action_space.sample()
def act(self, observation, keep_steps=10):
self.count_steps += 1
if self.count_steps > keep_steps:
self.action = self.action_space.sample()
self.count_steps = 0
else:
return self.action
return self.action
def reset(self):
self.action = self.action_space.sample()
self.count_steps = 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=None)
parser.add_argument("-e", "--env_id", default='UnrealTrackGeneral-FlexibleRoom-ContinuousColor-v0',
help='Select the environment to run')
parser.add_argument("-r", '--render', dest='render', action='store_true', help='show env using cv2')
parser.add_argument("-s", '--seed', dest='seed', default=1, help='random seed')
parser.add_argument("-t", '--time-dilation', dest='time_dilation', default=10, help='time_dilation to keep fps in simulator')
parser.add_argument("-n", '--nav-agent', dest='nav_agent', action='store_true', help='use nav agent to control the agents')
parser.add_argument("-d", '--early-done', dest='early_done', default=100, help='early_done when lost in n steps')
parser.add_argument("-m", '--monitor', dest='monitor', action='store_true', help='auto_monitor')
args = parser.parse_args()
env = gym.make(args.env_id)
if int(args.time_dilation) > 0: # -1 means no time_dilation
env = time_dilation.TimeDilationWrapper(env, int(args.time_dilation))
if int(args.early_done) > 0: # -1 means no early_done
env = early_done.EarlyDoneWrapper(env, int(args.early_done))
if args.monitor:
env = monitor.DisplayWrapper(env)
env = augmentation.RandomPopulationWrapper(env, 2, 2, random_target=False)
# env = configUE.ConfigUEWrapper(env, offscreen=False,resolution=(160,160))
env = agents.NavAgents(env, mask_agent=False)
env.seed(int(args.seed))
episode_count = 100
rewards = 0
done = False
Total_rewards = 0
try:
for eps in range(1, episode_count):
image = []
action = []
reward = []
info_list =[]
obs = env.reset()
agents_num = len(env.action_space)
tracker = PoseTracker(env.action_space[0], 200, 0) # TODO support multi trackers
tracker_random = RandomAgent(env.action_space[0])
count_step = 0
t0 = time.time()
agents_num = len(obs)
C_rewards = np.zeros(agents_num)
actions=[[0,0],[0,0]]
flag=0
env.unwrapped.reward_params['exp_distance'] = 250
env.unwrapped.reward_params['exp_angle'] = 0
while True:
obs, rewards, done, info = env.step(actions)
target_pose = env.unwrapped.unrealcv.get_obj_pose(
env.unwrapped.player_list[env.unwrapped.target_id])
tracker_pose=env.unwrapped.unrealcv.get_obj_pose(
env.unwrapped.player_list[env.unwrapped.tracker_id])
actions = [np.array(tracker.act(tracker_pose, target_pose))]
flag -= 1
if random.random() < 0.2 or flag > 0:
actions[0] = tracker_random.act(obs,keep_steps=3)
if flag <= 0:
action_tmp = actions[0]
flag = random.randint(2, 3)
else:
actions[0] = action_tmp
image.append(obs[env.unwrapped.tracker_id])
action.append([actions[0]])
reward.append(rewards)
info_list.append(info)
C_rewards += rewards
count_step += 1
cv2.imshow('rgb', (obs[0][:, :, 0:3].astype(np.uint8)))
cv2.waitKey(1)
if done:
fps = count_step/(time.time() - t0)
Total_rewards += C_rewards[0]
print ('Fps:' + str(fps), 'R:'+str(C_rewards), 'R_ave:'+str(Total_rewards/eps))
dict = {
'action': action,
'image': image,
'reward': reward,
'info': info_list,
}
save_dir = os.path.join(
'path/imperfect_' + "%04d" % int(eps) + "-%03d" % count_step + '.pt')
if count_step==500:
torch.save(dict, save_dir)
break
# Close the env and write monitor result info to disk
print('Finished')
env.close()
except KeyboardInterrupt:
print('exiting')
env.close()