Merge branch 'velocity_control'

This commit is contained in:
Yushu Zhang
2020-04-30 19:57:52 -04:00
5 changed files with 345 additions and 0 deletions

9
requirement.txt Normal file
View File

@@ -0,0 +1,9 @@
sys
matplotlib
numpy
scipy
math
random
time
cvxpy
heapq

104
src/control/Astar.py Normal file
View File

@@ -0,0 +1,104 @@
import sys
import matplotlib.pyplot as plt
import numpy as np
import scipy.interpolate
import math
import random
import time
import heapq # https://docs.python.org/3/library/heapq.html
def load_poses(pose_gt_file) :
pose_gt = np.loadtxt(pose_gt_file, delimiter = ",")
return pose_gt
def plot_position(pose_gt):
# NED (North, East, Down)
x = pose_gt[:, 1]
y = pose_gt[:, 2]
z = pose_gt[:, 3]
plt.figure()
plt.scatter(y, x, 1, c=-z, linewidth=2) # Note Z points down
plt.axis('equal')
plt.title('Ground Truth Position of Nodes in SLAM Graph')
plt.xlabel('East (m)')
plt.ylabel('North (m)')
plt.colorbar()
plt.show()
return
class PriorityQueue:
def __init__(self):
self.elements = []
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item))
def get(self):
return heapq.heappop(self.elements)[1]
def find_path(cur_node, parent_idx, start_idx):
next_idx = cur_node
path = [next_idx]
while next_idx != start_idx:
next_idx = parent_idx[next_idx]
path.append(next_idx)
return path[::-1]
def Astar(start_idx, goal_idx, poses, k=20):
visit_queue = PriorityQueue()
visited_flag, queueed_flag = np.zeros(poses.shape[0]), np.zeros(poses.shape[0])
g_score, h_score = np.full(poses.shape[0], np.inf), np.full(poses.shape[0], np.inf)
parent_idx = np.zeros(poses.shape[0], dtype='int')
test_tree = scipy.spatial.KDTree(poses)
# initialize
goal = poses[goal_idx]
g_score[start_idx] = 0
visit_queue.put(start_idx, np.inf)
queueed_flag[start_idx] = 1
while not visit_queue.empty():
cur_node = visit_queue.get()
visited_flag[cur_node] = 1
if cur_node == goal_idx:
print('find optimal path from {} to {}'.format(start_idx, goal_idx))
break
# find neighbours
neighbors = test_tree.query(poses[cur_node], k=k)
for nb_cur_dist, nb_idx in zip(neighbors[0][1:], neighbors[1][1:]):
if visited_flag[nb_idx] == 1:
continue
temp_dist = g_score[cur_node] + np.linalg.norm(poses[cur_node] - poses[nb_idx])
# temp_dist = g_score[cur_node] + nb_cur_dist ## this not work
if g_score[nb_idx] > temp_dist:
g_score[nb_idx] = temp_dist
parent_idx[nb_idx] = cur_node
f_score = g_score[nb_idx] + np.linalg.norm(poses[nb_idx] - goal)
# put into queen
if queueed_flag[nb_idx] == 0:
visit_queue.put(nb_idx, f_score)
queueed_flag[nb_idx] = 1
if cur_node != goal_idx:
print('find closest path from {} to {}'.format(start_idx, cur_node))
return cur_node, parent_idx

View File

@@ -0,0 +1,51 @@
from mpc_func import *
from Astar import *
poses = load_poses('pose_gt.csv')
sparseness = 100
# print(poses.shape[0]/sparseness)
# plot_position(poses[1::sparseness])
sparse_poses = poses[1::sparseness, 1:3]
start_idx = np.random.randint(sparse_poses.shape[0])
goal_idx = np.random.randint(sparse_poses.shape[0])
# start_idx = 148
# goal_idx = 6976
# start_time = time.time()
cur_node, parent_idx = Astar(start_idx, goal_idx, sparse_poses, k=20)
path = find_path(cur_node, parent_idx, start_idx)
# print(time.time() - start_time)
#print(start_idx, goal_idx)
#print(path)
# print(len(path))
# print(total_dist(path))
plt.figure(figsize=(10,10))
plt.scatter(sparse_poses[:,0], sparse_poses[:,1], s=8)
plt.scatter(sparse_poses[path,0], sparse_poses[path,1], c='y', s=80)
plt.scatter(sparse_poses[start_idx,0], sparse_poses[start_idx,1], marker='o', c='g', s=200, label='start')
plt.scatter(sparse_poses[goal_idx,0], sparse_poses[goal_idx,1], marker='*', c='r', s=200, label='goal')
plt.legend()
plt.show()
# along trajectory
path_poses = sparse_poses[path, :]
dt = 1 # [s] discrete time
lr = 1.0 # [m]
T = 6 # number of horizon
max_speed = 5
min_speed = -5
speed_now = 1
theta = -1.5
for interval in range(len(path)//T):
short_path_poses = path_poses[interval*T:(interval+1)*T,:]
u, next_x, xstar, ustar = path_poses_to_input(path_poses, speed_now, theta,
dt, lr, T, max_speed, min_speed)
speed_now = xstar.value[2,-1]
theta = xstar.value[3,-1]
print(ustar.value)

126
src/control/mpc_func.py Normal file
View File

@@ -0,0 +1,126 @@
import cvxpy
import numpy as np
from cvxpy import *
import matplotlib.pyplot as plt
from math import *
import time
def LinearizeCarModel(xb, u, dt, lr):
x = xb[0]
y = xb[1]
v = xb[2]
theta = xb[3]
a = u[0]
beta = u[1]
t1 = -dt * v * sin(theta + beta)
t2 = dt * v * cos(theta + beta)
A = np.eye(xb.shape[0])
A[0, 2] = dt * cos(theta + beta)
A[1, 2] = dt * sin(theta + beta)
A[3, 2] = dt * sin(beta) / lr
#A[0, 3] = t1
#A[1, 3] = t2
B = np.zeros((xb.shape[0], u.shape[0]))
B[2, 0] = dt
B[0, 1] = t1
B[1, 1] = t2
B[3, 1] = dt * v * cos(beta) / lr
tm = np.zeros((4, 1))
tm[0, 0] = v * cos(theta + beta) * dt
tm[1, 0] = v * sin(theta + beta) * dt
tm[2, 0] = a * dt
tm[3, 0] = v / lr * sin(beta) * dt
C = xb + tm
C = C - A @ xb - B @ u
# print(A, B, C)
return A, B, C
def NonlinearModel(x, u, dt, lr):
print(x.value)
x[0] = x[0] + x[2] * cos(x[3] + u[1]) * dt
x[1] = x[1] + x[2] * sin(x[3] + u[1]) * dt
x[2] = x[2] + u[0] * dt
x[3] = x[3] + x[2] / lr * sin(u[1]) * dt
return x
def CalcInput(A, B, C, x, u, T, max_speed, min_speed, path_poses):
x_0 = x[:]
x = Variable((x.shape[0], T + 1))
u = Variable((u.shape[0], T))
# MPC controller
states = []
constr = [x[:, 0] == (x_0.T)[0]]#, x[2, T] == 0.0]
for t in range(1,T):
#pdb.set_trace()
constr += [x[:, t + 1] == A * x[:, t] + B * u[:, t] +(C.T)[0]]
constr += [abs(u[:, t]) <= 5]
constr += [x[2, t + 1] <= max_speed]
constr += [x[2, t + 1] >= min_speed]
# cost = sum_squares(u[:,t])
cost = sum_squares(abs(x[0, t] - path_poses[t][0])) * 1 * (10-2*t)
cost += sum_squares(abs(x[1, t] - path_poses[t][1])) * 1 * (10-2*t)
cost += sum_squares(abs(u[1, t])) * 10 * (t+1)
if t == T - 1:
cost += (x[0, t + 1] - path_poses[t][0]) ** 2 * 100.0
cost += (x[1, t + 1] - path_poses[t][1]) ** 2 * 100.0
states.append(Problem(Minimize(cost), constr))
prob = sum(states)
#pdb.set_trace()
#prob.constraints += [x[:, 0] == (x_0.T)[0]]#, x[2, T] == 0.0]
start = time.time()
# result=prob.solve(verbose=True)
result = prob.solve()
elapsed_time = time.time() - start
# print("calc time:{0}".format(elapsed_time) + "[sec]")
# print(prob.value)
if prob.status != OPTIMAL:
print("Cannot calc opt")
# print(prob.status)
return u, x, prob.value
def GetListFromMatrix(x):
return np.array(x).flatten().tolist()
def path_poses_to_input(path_poses, speed_now, theta, dt, lr, T, max_speed, min_speed):
x0 = np.array([[path_poses[0][0], path_poses[0][1], speed_now, theta]]).T # [x,y,v theta]
u = np.array([[0.0, 0.0]]).T # [a,beta]
x = x0
A, B, C = LinearizeCarModel(x, u, dt, lr)
ustar, xstar, cost = CalcInput(A, B, C, x, u, T, max_speed, min_speed, path_poses)
u[0, 0] = GetListFromMatrix(ustar.value[0, :])[0]
u[1, 0] = float(ustar[1, 0].value)
x = A @ x + B @ u
return u, x, xstar, ustar
def load_poses(pose_gt_file) :
pose_gt = np.loadtxt(pose_gt_file, delimiter = ",")
return pose_gt

View File

@@ -0,0 +1,55 @@
from mpc_func import *
poses = load_poses('pose_gt.csv')
sparseness = 100
sparse_poses = poses[1::sparseness, 1:3]
path = [148, 150, 151, 153, 154, 156, 158, 160, 162, 163]
dt = 1 # [s] discrete time
lr = 1.0 # [m]
T = 6 # number of horizon
max_speed = 5
min_speed = -5
speed_now = 1
theta = -1.5
path_poses = sparse_poses[path[:T+1], :]
u, next_x, xstar, ustar = path_poses_to_input(path_poses, speed_now, theta,
dt, lr, T, max_speed, min_speed)
# plot the result
plt.figure(figsize=(10,10))
plt.subplot(3, 1, 1)
plt.plot(path_poses[0][0], path_poses[0][1], 'xb', label='current pose')
plt.plot(next_x[0], next_x[1], 'xr', label='next pose given current control output')
plt.plot(GetListFromMatrix(xstar.value[0, :]), GetListFromMatrix(
xstar.value[1, :]), '-.', label='estimated trajectory given control outputs')
plt.plot(path_poses[:T,0], path_poses[:T,1], label='reference trajectory')
plt.axis("equal")
plt.xlabel("x[m]")
plt.ylabel("y[m]")
plt.legend()
plt.grid(True)
plt.subplot(3, 1, 2)
plt.cla()
plt.plot(GetListFromMatrix(xstar.value[2, :]), '-b',label='linear velocity')
plt.plot(GetListFromMatrix(xstar.value[3, :]), '-r',label='pose angle')
#plt.ylim([-1.0, 1.0])
plt.ylabel("velocity[m/s]")
plt.xlabel("horizon")
plt.legend()
plt.grid(True)
plt.subplot(3, 1, 3)
plt.cla()
plt.plot(GetListFromMatrix(ustar.value[0, :]), '-r', label="acceleration")
plt.plot(GetListFromMatrix(ustar.value[1, :]), '-b', label="beta")
#plt.ylim([-0.5, 0.5])
plt.legend()
plt.grid(True)
plt.show()