mirror of
https://github.com/Mobile-Robotics-W20-Team-9/UMICH-NCLT-SLAP.git
synced 2025-09-08 04:03:14 +00:00
control part
This commit is contained in:
104
Astar.py
Normal file
104
Astar.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import sys
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import scipy.interpolate
|
||||
import math
|
||||
import random
|
||||
import time
|
||||
import heapq # https://docs.python.org/3/library/heapq.html
|
||||
|
||||
|
||||
def load_poses(pose_gt_file) :
|
||||
pose_gt = np.loadtxt(pose_gt_file, delimiter = ",")
|
||||
|
||||
return pose_gt
|
||||
|
||||
|
||||
def plot_position(pose_gt):
|
||||
# NED (North, East, Down)
|
||||
x = pose_gt[:, 1]
|
||||
y = pose_gt[:, 2]
|
||||
z = pose_gt[:, 3]
|
||||
|
||||
plt.figure()
|
||||
plt.scatter(y, x, 1, c=-z, linewidth=2) # Note Z points down
|
||||
plt.axis('equal')
|
||||
plt.title('Ground Truth Position of Nodes in SLAM Graph')
|
||||
plt.xlabel('East (m)')
|
||||
plt.ylabel('North (m)')
|
||||
plt.colorbar()
|
||||
|
||||
plt.show()
|
||||
|
||||
return
|
||||
|
||||
|
||||
class PriorityQueue:
|
||||
def __init__(self):
|
||||
self.elements = []
|
||||
|
||||
def empty(self):
|
||||
return len(self.elements) == 0
|
||||
|
||||
def put(self, item, priority):
|
||||
heapq.heappush(self.elements, (priority, item))
|
||||
|
||||
def get(self):
|
||||
return heapq.heappop(self.elements)[1]
|
||||
|
||||
|
||||
def find_path(cur_node, parent_idx, start_idx):
|
||||
next_idx = cur_node
|
||||
path = [next_idx]
|
||||
|
||||
while next_idx != start_idx:
|
||||
next_idx = parent_idx[next_idx]
|
||||
path.append(next_idx)
|
||||
|
||||
return path[::-1]
|
||||
|
||||
|
||||
def Astar(start_idx, goal_idx, poses, k=20):
|
||||
visit_queue = PriorityQueue()
|
||||
visited_flag, queueed_flag = np.zeros(poses.shape[0]), np.zeros(poses.shape[0])
|
||||
g_score, h_score = np.full(poses.shape[0], np.inf), np.full(poses.shape[0], np.inf)
|
||||
parent_idx = np.zeros(poses.shape[0], dtype='int')
|
||||
|
||||
test_tree = scipy.spatial.KDTree(poses)
|
||||
|
||||
# initialize
|
||||
goal = poses[goal_idx]
|
||||
g_score[start_idx] = 0
|
||||
visit_queue.put(start_idx, np.inf)
|
||||
queueed_flag[start_idx] = 1
|
||||
|
||||
while not visit_queue.empty():
|
||||
cur_node = visit_queue.get()
|
||||
visited_flag[cur_node] = 1
|
||||
|
||||
if cur_node == goal_idx:
|
||||
print('find optimal path from {} to {}'.format(start_idx, goal_idx))
|
||||
break
|
||||
|
||||
# find neighbours
|
||||
neighbors = test_tree.query(poses[cur_node], k=k)
|
||||
|
||||
for nb_cur_dist, nb_idx in zip(neighbors[0][1:], neighbors[1][1:]):
|
||||
if visited_flag[nb_idx] == 1:
|
||||
continue
|
||||
|
||||
temp_dist = g_score[cur_node] + np.linalg.norm(poses[cur_node] - poses[nb_idx])
|
||||
# temp_dist = g_score[cur_node] + nb_cur_dist ## this not work
|
||||
if g_score[nb_idx] > temp_dist:
|
||||
g_score[nb_idx] = temp_dist
|
||||
parent_idx[nb_idx] = cur_node
|
||||
f_score = g_score[nb_idx] + np.linalg.norm(poses[nb_idx] - goal)
|
||||
|
||||
# put into queen
|
||||
if queueed_flag[nb_idx] == 0:
|
||||
visit_queue.put(nb_idx, f_score)
|
||||
queueed_flag[nb_idx] = 1
|
||||
|
||||
if cur_node != goal_idx:
|
||||
print('find closest path from {} to {}'.format(start_idx, cur_node))
|
||||
return cur_node, parent_idx
|
51
mpc_along_trajectory.py
Normal file
51
mpc_along_trajectory.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from mpc_func import *
|
||||
from Astar import *
|
||||
|
||||
poses = load_poses('pose_gt.csv')
|
||||
sparseness = 100
|
||||
print(poses.shape[0]/sparseness)
|
||||
# plot_position(poses[1::sparseness])
|
||||
sparse_poses = poses[1::sparseness, 1:3]
|
||||
|
||||
start_idx = np.random.randint(sparse_poses.shape[0])
|
||||
goal_idx = np.random.randint(sparse_poses.shape[0])
|
||||
|
||||
# start_idx = 148
|
||||
# goal_idx = 6976
|
||||
# start_time = time.time()
|
||||
cur_node, parent_idx = Astar(start_idx, goal_idx, sparse_poses, k=20)
|
||||
path = find_path(cur_node, parent_idx, start_idx)
|
||||
# print(time.time() - start_time)
|
||||
print(start_idx, goal_idx)
|
||||
print(path)
|
||||
# print(len(path))
|
||||
# print(total_dist(path))
|
||||
|
||||
plt.figure(figsize=(10,10))
|
||||
plt.scatter(sparse_poses[:,0], sparse_poses[:,1], s=8)
|
||||
plt.scatter(sparse_poses[path,0], sparse_poses[path,1], c='y', s=80)
|
||||
plt.scatter(sparse_poses[start_idx,0], sparse_poses[start_idx,1], marker='o', c='g', s=200, label='start')
|
||||
plt.scatter(sparse_poses[goal_idx,0], sparse_poses[goal_idx,1], marker='*', c='r', s=200, label='goal')
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
|
||||
# along trajectory
|
||||
path_poses = sparse_poses[path, :]
|
||||
|
||||
dt = 1 # [s] discrete time
|
||||
lr = 1.0 # [m]
|
||||
T = 6 # number of horizon
|
||||
max_speed = 5
|
||||
min_speed = -5
|
||||
|
||||
speed_now = 1
|
||||
theta = -1.5
|
||||
|
||||
for interval in range(len(path)//T):
|
||||
short_path_poses = path_poses[interval*T:(interval+1)*T,:]
|
||||
u, next_x, xstar, ustar = path_poses_to_input(path_poses, speed_now, theta,
|
||||
dt, lr, T, max_speed, min_speed)
|
||||
speed_now = xstar.value[2,-1]
|
||||
theta = xstar.value[3,-1]
|
||||
print(ustar.value)
|
126
mpc_func.py
Normal file
126
mpc_func.py
Normal file
@@ -0,0 +1,126 @@
|
||||
import cvxpy
|
||||
import numpy as np
|
||||
from cvxpy import *
|
||||
import matplotlib.pyplot as plt
|
||||
from math import *
|
||||
import time
|
||||
|
||||
|
||||
def LinearizeCarModel(xb, u, dt, lr):
|
||||
x = xb[0]
|
||||
y = xb[1]
|
||||
v = xb[2]
|
||||
theta = xb[3]
|
||||
|
||||
a = u[0]
|
||||
beta = u[1]
|
||||
|
||||
t1 = -dt * v * sin(theta + beta)
|
||||
t2 = dt * v * cos(theta + beta)
|
||||
|
||||
A = np.eye(xb.shape[0])
|
||||
A[0, 2] = dt * cos(theta + beta)
|
||||
A[1, 2] = dt * sin(theta + beta)
|
||||
A[3, 2] = dt * sin(beta) / lr
|
||||
#A[0, 3] = t1
|
||||
#A[1, 3] = t2
|
||||
|
||||
B = np.zeros((xb.shape[0], u.shape[0]))
|
||||
B[2, 0] = dt
|
||||
B[0, 1] = t1
|
||||
B[1, 1] = t2
|
||||
B[3, 1] = dt * v * cos(beta) / lr
|
||||
|
||||
tm = np.zeros((4, 1))
|
||||
tm[0, 0] = v * cos(theta + beta) * dt
|
||||
tm[1, 0] = v * sin(theta + beta) * dt
|
||||
tm[2, 0] = a * dt
|
||||
tm[3, 0] = v / lr * sin(beta) * dt
|
||||
C = xb + tm
|
||||
C = C - A @ xb - B @ u
|
||||
|
||||
# print(A, B, C)
|
||||
|
||||
return A, B, C
|
||||
|
||||
|
||||
def NonlinearModel(x, u, dt, lr):
|
||||
print(x.value)
|
||||
x[0] = x[0] + x[2] * cos(x[3] + u[1]) * dt
|
||||
x[1] = x[1] + x[2] * sin(x[3] + u[1]) * dt
|
||||
x[2] = x[2] + u[0] * dt
|
||||
x[3] = x[3] + x[2] / lr * sin(u[1]) * dt
|
||||
|
||||
return x
|
||||
|
||||
|
||||
def CalcInput(A, B, C, x, u, T, max_speed, min_speed, path_poses):
|
||||
|
||||
x_0 = x[:]
|
||||
x = Variable((x.shape[0], T + 1))
|
||||
u = Variable((u.shape[0], T))
|
||||
|
||||
# MPC controller
|
||||
states = []
|
||||
constr = [x[:, 0] == (x_0.T)[0]]#, x[2, T] == 0.0]
|
||||
for t in range(1,T):
|
||||
#pdb.set_trace()
|
||||
|
||||
constr += [x[:, t + 1] == A * x[:, t] + B * u[:, t] +(C.T)[0]]
|
||||
constr += [abs(u[:, t]) <= 5]
|
||||
constr += [x[2, t + 1] <= max_speed]
|
||||
constr += [x[2, t + 1] >= min_speed]
|
||||
# cost = sum_squares(u[:,t])
|
||||
cost = sum_squares(abs(x[0, t] - path_poses[t][0])) * 1 * (10-2*t)
|
||||
cost += sum_squares(abs(x[1, t] - path_poses[t][1])) * 1 * (10-2*t)
|
||||
cost += sum_squares(abs(u[1, t])) * 10 * (t+1)
|
||||
if t == T - 1:
|
||||
cost += (x[0, t + 1] - path_poses[t][0]) ** 2 * 100.0
|
||||
cost += (x[1, t + 1] - path_poses[t][1]) ** 2 * 100.0
|
||||
|
||||
states.append(Problem(Minimize(cost), constr))
|
||||
|
||||
prob = sum(states)
|
||||
#pdb.set_trace()
|
||||
#prob.constraints += [x[:, 0] == (x_0.T)[0]]#, x[2, T] == 0.0]
|
||||
|
||||
start = time.time()
|
||||
# result=prob.solve(verbose=True)
|
||||
result = prob.solve()
|
||||
elapsed_time = time.time() - start
|
||||
# print("calc time:{0}".format(elapsed_time) + "[sec]")
|
||||
# print(prob.value)
|
||||
|
||||
if prob.status != OPTIMAL:
|
||||
print("Cannot calc opt")
|
||||
|
||||
# print(prob.status)
|
||||
return u, x, prob.value
|
||||
|
||||
|
||||
def GetListFromMatrix(x):
|
||||
return np.array(x).flatten().tolist()
|
||||
|
||||
|
||||
def path_poses_to_input(path_poses, speed_now, theta, dt, lr, T, max_speed, min_speed):
|
||||
x0 = np.array([[path_poses[0][0], path_poses[0][1], speed_now, theta]]).T # [x,y,v theta]
|
||||
u = np.array([[0.0, 0.0]]).T # [a,beta]
|
||||
|
||||
x = x0
|
||||
|
||||
A, B, C = LinearizeCarModel(x, u, dt, lr)
|
||||
ustar, xstar, cost = CalcInput(A, B, C, x, u, T, max_speed, min_speed, path_poses)
|
||||
|
||||
u[0, 0] = GetListFromMatrix(ustar.value[0, :])[0]
|
||||
u[1, 0] = float(ustar[1, 0].value)
|
||||
|
||||
x = A @ x + B @ u
|
||||
|
||||
return u, x, xstar, ustar
|
||||
|
||||
|
||||
def load_poses(pose_gt_file) :
|
||||
pose_gt = np.loadtxt(pose_gt_file, delimiter = ",")
|
||||
|
||||
return pose_gt
|
||||
|
55
one_round_demo.py
Normal file
55
one_round_demo.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from mpc_func import *
|
||||
|
||||
poses = load_poses('pose_gt.csv')
|
||||
sparseness = 100
|
||||
sparse_poses = poses[1::sparseness, 1:3]
|
||||
|
||||
path = [148, 150, 151, 153, 154, 156, 158, 160, 162, 163]
|
||||
|
||||
|
||||
dt = 1 # [s] discrete time
|
||||
lr = 1.0 # [m]
|
||||
T = 6 # number of horizon
|
||||
max_speed = 5
|
||||
min_speed = -5
|
||||
|
||||
speed_now = 1
|
||||
theta = -1.5
|
||||
|
||||
path_poses = sparse_poses[path[:T+1], :]
|
||||
u, next_x, xstar, ustar = path_poses_to_input(path_poses, speed_now, theta,
|
||||
dt, lr, T, max_speed, min_speed)
|
||||
|
||||
|
||||
# plot the result
|
||||
plt.figure(figsize=(10,10))
|
||||
plt.subplot(3, 1, 1)
|
||||
plt.plot(path_poses[0][0], path_poses[0][1], 'xb', label='current pose')
|
||||
plt.plot(next_x[0], next_x[1], 'xr', label='next pose given current control output')
|
||||
plt.plot(GetListFromMatrix(xstar.value[0, :]), GetListFromMatrix(
|
||||
xstar.value[1, :]), '-.', label='estimated trajectory given control outputs')
|
||||
plt.plot(path_poses[:T,0], path_poses[:T,1], label='reference trajectory')
|
||||
plt.axis("equal")
|
||||
plt.xlabel("x[m]")
|
||||
plt.ylabel("y[m]")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
plt.subplot(3, 1, 2)
|
||||
plt.cla()
|
||||
plt.plot(GetListFromMatrix(xstar.value[2, :]), '-b',label='linear velocity')
|
||||
plt.plot(GetListFromMatrix(xstar.value[3, :]), '-r',label='pose angle')
|
||||
#plt.ylim([-1.0, 1.0])
|
||||
plt.ylabel("velocity[m/s]")
|
||||
plt.xlabel("horizon")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
plt.subplot(3, 1, 3)
|
||||
plt.cla()
|
||||
plt.plot(GetListFromMatrix(ustar.value[0, :]), '-r', label="acceleration")
|
||||
plt.plot(GetListFromMatrix(ustar.value[1, :]), '-b', label="beta")
|
||||
#plt.ylim([-0.5, 0.5])
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.show()
|
9
requirement.txt
Normal file
9
requirement.txt
Normal file
@@ -0,0 +1,9 @@
|
||||
sys
|
||||
matplotlib
|
||||
numpy
|
||||
scipy
|
||||
math
|
||||
random
|
||||
time
|
||||
cvxpy
|
||||
heapq
|
Reference in New Issue
Block a user