-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSTOMP.py
308 lines (262 loc) · 14.5 KB
/
STOMP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
__all__ = ['STOMP']
import time
import numpy as np
import pybullet as p
from cairo_simulator.core.link import get_link_pairs, get_joint_info_by_name, get_movable_links
from cairo_planning.collisions import self_collision_test, DisabledCollisionsContext
from cairo_planning.local.interpolation import parametric_lerp
class STOMP():
def __init__(self, sim, robot, link_pairs, obstacles,
start_state_config, goal_state_config, N, control_coefficient=0.5,
debug=True, play_pause=False, max_iterations=500, K=5, h=10):
# Sim and robot specific initializations
self.sim = sim
self.robot = robot
# The joint limits are [(-3.0503, 3.0503), (-3.8183, 2.2824), (-3.0514, 3.0514),
# (-3.0514, 3.0514), (-2.9842, 2.9842), (-2.9842, 2.9842), (-4.7104, 4.7104)]
self.robot_id = self.robot.get_simulator_id()
self.dof = len(start_state_config)
self.link_pairs = link_pairs # The link pairs to check self collision for
self.obstacle_ids = obstacles # PyBullet object IDs
# Start and goal state in joint space
self.start_state_config = np.array(start_state_config) if isinstance(start_state_config, list) else start_state_config
self.goal_state_config = np.array(goal_state_config) if isinstance(goal_state_config, list) else goal_state_config
# STOMP specific initializations
self.max_iterations = max_iterations
self.N = N
self.trajectory = parametric_lerp(self.start_state_config, self.goal_state_config, self.N)
self.A = self._init_A()
self.R = np.matmul(self.A.transpose(), self.A)
self.R_inverse = np.linalg.inv(self.R)
self.M = np.copy(self.R_inverse)
self._rescale_M()
self.K = K
self.h = h
self.K_best_noisy_trajectories = None
self.trajectory_cost = np.inf
self.check_distance = 3
self.convergence_diff = 0.01
self.closeness_penalty = 0.05
self.control_coefficient = control_coefficient
# Debugging and visualization specific variables
self.debug = debug
self.trajectory_visualization_ids = []
self.play_pause = play_pause
self.finish_button_value = False
def _init_A(self):
self.A = np.zeros((self.N, self.N))
np.fill_diagonal(self.A, 1)
rng = np.arange(self.N - 1)
self.A[rng + 1, rng] = -2
rng = np.arange(self.N - 2)
self.A[rng + 2, rng] = 1
return self.A
def _rescale_M(self):
scale_factor = np.max(self.M, axis = 0)
self.M = self.M / (self.N * scale_factor)
def print_trajectory(self):
print("######### Trajectory ##########")
print(self.trajectory)
def plan(self):
self.trajectory_cost = self._compute_trajectory_cost()
finish_button_value = False # Only used for interactive visualization
for iteration in range(self.max_iterations):
K_noises, K_noisy_trajectories = self._create_K_noisy_trajectories()
P = self._compute_probabilities(K_noisy_trajectories)
# This function only helps in the visualization of per iteration evolution of the trajectory
# when the self.play_pause flag is set and has nothing to do with the actual planning
self._interactive_planning_visualization()
self._update_trajectory(P, K_noises)
new_cost = self._compute_trajectory_cost()
# TODO: Change stopping criteria to small difference in trajectory
if self.trajectory_cost - new_cost < self.convergence_diff and self.trajectory_cost - new_cost > 0:
print("Feasible path found in {} iterations ...".format(iteration))
return
self.trajectory_cost = new_cost
print("Reached maximum iterations without sufficient cost improvement...")
def get_trajectory(self, time_between_each_waypoint = 5):
return [(time_between_each_waypoint * (i+1), list(waypoint)) for i, waypoint in enumerate(self.trajectory)]
def visualize_trajectory(self, show_only_dot = False):
# The visualization or the forward kinematics overestimates the Z axis for some unknown reason
for i, configuration in enumerate(self.trajectory):
end_effector_world_position = self.robot.solve_forward_kinematics(configuration)[0][0]
if i == 0:
text_string, color = "Start", [0.0,1.0,0.0]
elif i == self.N - 1:
text_string, color = "Goal", [0.0,1.0,0.0]
else:
text_string, color = str(i), [1.0,0.0,0.0]
if show_only_dot:
text_string = "*"
self.trajectory_visualization_ids.append(p.addUserDebugText(text=text_string,
textPosition=end_effector_world_position,
textColorRGB=color, textSize=0.75))
def clear_trajectory_visualization(self):
for i in self.trajectory_visualization_ids:
p.removeUserDebugItem(i)
def _interactive_planning_visualization(self):
if self.play_pause and not self.finish_button_value:
resume_button_id = p.addUserDebugParameter("Resume", 1, 0, 0)
finish_button_id = p.addUserDebugParameter("Finish", 1, 0, 0)
self.visualize_trajectory()
while True:
resume_button_value = bool(p.readUserDebugParameter(resume_button_id) % 2)
self.finish_button_value = bool(p.readUserDebugParameter(finish_button_id))
if resume_button_value or self.finish_button_value:
self.clear_trajectory_visualization()
p.removeAllUserParameters()
time.sleep(1)
break
time.sleep(0.01)
def _state_cost(self, joint_configuration):
return self._collision_cost(joint_configuration)
def _collision_cost(self, joint_configuration):
collision_cost = self._self_collision_cost(list(joint_configuration), self.robot,
self.link_pairs, self.closeness_penalty)
collision_cost += self._obstacle_collision_cost(list(joint_configuration), self.robot,
get_movable_links(self.robot_id), self.obstacle_ids,
self.closeness_penalty)
return collision_cost
def _create_K_noisy_trajectories(self):
# Adding trajectory + noise
# Performing the sampling and update for one joint at a time as mentioned in the literature
K_noisy_trajectories = []
K_noises = []
# TODO: Saving K best noisy trajectories so far
for k in range(self.K):
# single_noise is for one entire trajectory so in the end it will have a shape (waypoints x joints)
single_noise = []
for i in range(self.dof):
# joint_i_noise has length equals to the N (waypoints)
joint_i_noise = np.random.multivariate_normal(np.zeros((self.N)), self.M)
# TODO: Clipping beyond joint limits
single_noise.append(joint_i_noise)
# Transposing because we want rows as waypoints and columns as joints
single_noise = np.array(single_noise).transpose()
# Adding the noise while keeping the start and goal waypoints unchanged
start_state_configuration = self.trajectory[0].copy()
goal_state_configuration = self.trajectory[-1].copy()
# Adding a trajectory shaped noise to the trajectory
single_noisy_trajectory, single_clipped_noise = self._add_and_clip_noise(self.trajectory, single_noise)
# Restoring the start and the goal waypoints because they shouldn't be changed anyways
single_noisy_trajectory[0] = start_state_configuration
single_noisy_trajectory[-1] = goal_state_configuration
K_noisy_trajectories.append(single_noisy_trajectory.copy())
K_noises.append(single_clipped_noise.copy())
return K_noises, K_noisy_trajectories
def _add_and_clip_noise(self, trajectory, single_noise):
single_noisy_trajectory = np.zeros_like(trajectory)
single_clipped_noise = np.zeros_like(trajectory)
for j in range(self.dof):
single_noisy_trajectory[:, j] = np.clip(trajectory[:, j] + single_noise[:, j],
a_min=self.robot._arm_joint_limits[j][0],
a_max=self.robot._arm_joint_limits[j][1])
single_clipped_noise[:, j] = single_noisy_trajectory[:, j] - trajectory[:, j]
return single_noisy_trajectory, single_clipped_noise
def _compute_probabilities(self, K_noisy_trajectories):
S = np.zeros((self.K, self.N))
P = np.zeros((self.K, self.N))
exp_values = np.zeros((self.K, self.N))
with DisabledCollisionsContext(self.sim):
for k in range(self.K):
for i in range(self.N):
S[k][i] = self._state_cost(K_noisy_trajectories[k][i]) + \
self._control_cost(K_noisy_trajectories[k])/self.N
# S[k][i] = self._control_cost(K_noisy_trajectories[k]) / self.N
S_max_for_each_i = np.max(S, axis=0)
S_min_for_each_i = np.min(S, axis=0)
for k in range(self.K):
for i in range(self.N):
exp_values[k][i] = np.exp((self.h*-1*(S[k][i] - S_min_for_each_i[i]))/
(S_max_for_each_i[i] - S_min_for_each_i[i]))
sum_exp_values_for_each_i = np.sum(exp_values, axis=0)
for k in range(self.K):
for i in range(self.N):
P[k][i] = exp_values[k][i]/sum_exp_values_for_each_i[i]
return P
def _update_trajectory(self, P, K_noises):
delta_trajectory = np.zeros_like(self.trajectory)
for i in range(self.N):
delta_trajectory_i = np.zeros((self.dof))
for k in range(self.K):
delta_trajectory_i += P[k][i] * K_noises[k][i]
delta_trajectory[i] = delta_trajectory_i
# Smoothening the delta trajectory updates by projecting onto basis vector R^-1
smoothened_delta_trajectory = np.matmul(self.M, delta_trajectory)
# Updating the trajectories while keeping the start and goal waypoints unchanged
# TODO: Check if this is redundant because the noisy trajectories have the unchanged start and goal
start_state_configuration = self.trajectory[0].copy()
goal_state_configuration = self.trajectory[-1].copy()
self.trajectory += smoothened_delta_trajectory
self.trajectory[0] = start_state_configuration
self.trajectory[-1] = goal_state_configuration
def _obstacle_collision_cost(self, joint_configuration, robot, links, obstacle_ids, closeness_penalty):
robot_id = robot.get_simulator_id()
# Set new configuration and get link states
for i, idx in enumerate(robot._arm_dof_indices):
p.resetJointState(robot._simulator_id, idx, targetValue=joint_configuration[i], targetVelocity=0,
physicsClientId=0)
max_cost = -np.inf
for obstacle in obstacle_ids:
for link in links:
closest_point_object = p.getClosestPoints(bodyA=robot_id, bodyB=obstacle, distance=self.check_distance,
linkIndexA=link, physicsClientId=0)
if closest_point_object:
distance = closest_point_object[0][8]
# print("Obstacle distance = ", distance)
cost = closeness_penalty - distance
if max_cost < cost:
max_cost = cost
# TODO: velocity of the link is not multiplied as in the literature
return max_cost
def _self_collision_cost(self, joint_configuration, robot, link_pairs, closeness_penalty):
"""
Given a joint configuration, finds a cost which is inversely proportional to the minimum distance between all the link pairs
It sets the robot state to the test configuration. The closest distance is calculated for every link pair.
A cost which is inversely proportional to the minimum distance found amongst all the link pairs is returned.
Args:
joint_configuration (list): The joint configuration to test for self-collision
robot (Manipulator Class instance): Manipulator Class instance
TODO: complete args
Returns:
float: The cost of collision
"""
robot_id = robot.get_simulator_id()
# Set new configuration and get link states
for i, idx in enumerate(robot._arm_dof_indices):
p.resetJointState(robot._simulator_id, idx, targetValue=joint_configuration[i], targetVelocity=0,
physicsClientId=0)
max_cost = -np.inf
for link1, link2 in link_pairs:
if link1 != link2:
closest_point_object = p.getClosestPoints(bodyA=robot_id, bodyB=robot_id, distance=self.check_distance,
linkIndexA=link1, linkIndexB=link2, physicsClientId=0)
if closest_point_object:
distance = closest_point_object[0][8]
# print("Link distance = ", distance)
cost = closeness_penalty - distance
if max_cost < cost:
max_cost = cost
return max_cost
def _compute_trajectory_cost(self):
cost = 0
with DisabledCollisionsContext(self.sim):
for i in range(self.N):
cost += self._state_cost(self.trajectory[i])
state_cost = cost
cost += self._control_cost(self.trajectory)
if self.debug:
print("State Cost = ", state_cost)
print("Control Cost = ", cost - state_cost)
print("Total Cost = ", cost)
print("")
return cost
def _control_cost(self, trajectory):
cost = 0.0
# Adding the control cost / acceleration cost for each DOF separately as mentioned in the literature
for j in range(self.dof):
control_cost_for_joint_j = self.control_coefficient * np.matmul(
np.matmul(trajectory[:, j].reshape((1, self.N)), self.R),
trajectory[:, j].reshape((self.N, 1)))
cost += control_cost_for_joint_j[0][0]
return cost