From 26e641a477075984ff970a0338b9ba4a739d47fa Mon Sep 17 00:00:00 2001 From: dbrisbin Date: Thu, 30 Apr 2020 19:42:26 -0400 Subject: [PATCH] planning functions notebook --- src/planning/Motion_Planning_Functions.ipynb | 611 +++++++++++++++++++ 1 file changed, 611 insertions(+) create mode 100644 src/planning/Motion_Planning_Functions.ipynb diff --git a/src/planning/Motion_Planning_Functions.ipynb b/src/planning/Motion_Planning_Functions.ipynb new file mode 100644 index 0000000..241a274 --- /dev/null +++ b/src/planning/Motion_Planning_Functions.ipynb @@ -0,0 +1,611 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "name": "Motion_Planning_Functions", + "provenance": [], + "collapsed_sections": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "accelerator": "GPU" + }, + "cells": [ + { + "cell_type": "code", + "metadata": { + "id": "NoNMujanjNuu", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#Given a map consisting of known poses and a start and end pose, find the optimal path between using A*\n", + "#Generate the relative motion in se2 between poses.\n", + "#This is straight line motion.\n", + "#Also implements cubic interpolation for a smooth trajectory across all points in path." + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "VqXKrUKwt05L", + "colab_type": "code", + "colab": {} + }, + "source": [ + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import random\n", + "import scipy.interpolate\n", + "import heapq #https://docs.python.org/3/library/heapq.html" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "VMJWsOvIvGeJ", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#Loading poses from the ground truth file\n", + "def load_poses(pose_gt_file) :\n", + " pose_gt = np.loadtxt(pose_gt_file, delimiter = \",\")\n", + " return pose_gt[1:, 1:3]\n", + "\n", + "poses = load_poses('../dataset/ground_truth/groundtruth_2012-01-08.csv')" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "jQQn9T3U4yA2", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#Astar and path functions\n", + "class PriorityQueue:\n", + " def __init__(self):\n", + " self.elements = []\n", + " \n", + " def empty(self):\n", + " return len(self.elements) == 0\n", + " \n", + " def put(self, item, priority):\n", + " heapq.heappush(self.elements, (priority, item))\n", + " \n", + " def get(self):\n", + " return heapq.heappop(self.elements)[1]\n", + "\n", + "class Astar :\n", + " # This class implements A* search along a network defined by several points\n", + " # Poses is an array of coordinates\n", + " # k defines how many nearest neighbors to look at during A* search\n", + " # The primary usage of this class is the find_path function:\n", + " # Required parameters:\n", + " # start_idx: \n", + " # goal_idx \n", + "\n", + " def __init__(self, poses) :\n", + " self.poses = poses\n", + " self.full_tree = scipy.spatial.KDTree(self.poses)\n", + "\n", + " def _extract_path(self, cur_node, parent_idx, start_idx, sparse_poses):\n", + " next_idx = cur_node\n", + " path = [self.full_tree.query(sparse_poses[next_idx])[1]]\n", + "\n", + " while next_idx != start_idx:\n", + " next_idx = parent_idx[next_idx]\n", + " path.append(self.full_tree.query(sparse_poses[next_idx])[1])\n", + " \n", + " return path[::-1]\n", + "\n", + " def find_path(self, full_start_idx, full_goal_idx, sparseness=1, k=5):\n", + " sparse_poses = poses[0::sparseness, :]\n", + " \n", + " visit_queue = PriorityQueue()\n", + " visited_flag, queueed_flag = np.zeros(sparse_poses.shape[0]), np.zeros(sparse_poses.shape[0])\n", + " g_score, h_score = np.full(sparse_poses.shape[0], np.inf), np.full(sparse_poses.shape[0], np.inf)\n", + " parent_idx = np.zeros(sparse_poses.shape[0], dtype='int')\n", + "\n", + " sparse_tree = scipy.spatial.KDTree(sparse_poses)\n", + "\n", + " start_idx = sparse_tree.query(poses[full_start_idx])[1]\n", + " goal_idx = sparse_tree.query(poses[full_goal_idx])[1]\n", + " \n", + " # initialize\n", + " goal = sparse_poses[goal_idx]\n", + "\n", + " g_score[start_idx] = 0\n", + " visit_queue.put(start_idx, np.inf)\n", + " queueed_flag[start_idx] = 1\n", + " optimal = False\n", + "\n", + " while not visit_queue.empty():\n", + " cur_node = visit_queue.get()\n", + " visited_flag[cur_node] = 1\n", + "\n", + " if cur_node == goal_idx:\n", + " optimal = True\n", + " break\n", + "\n", + " # find neighbours\n", + " neighbors = sparse_tree.query(sparse_poses[cur_node], k=k)\n", + "\n", + " for nb_cur_dist, nb_idx in zip(neighbors[0][1:], neighbors[1][1:]):\n", + " if visited_flag[nb_idx] == 1:\n", + " continue\n", + "\n", + " temp_dist = g_score[cur_node] + np.linalg.norm(sparse_poses[cur_node] - sparse_poses[nb_idx])\n", + " # temp_dist = g_score[cur_node] + nb_cur_dist ## this not work\n", + " if g_score[nb_idx] > temp_dist:\n", + " g_score[nb_idx] = temp_dist\n", + " parent_idx[nb_idx] = cur_node\n", + " f_score = g_score[nb_idx] + np.linalg.norm(sparse_poses[nb_idx] - goal)\n", + " \n", + " # put into queen\n", + " if queueed_flag[nb_idx] == 0:\n", + " visit_queue.put(nb_idx, f_score)\n", + " queueed_flag[nb_idx] = 1\n", + "\n", + " path = self._extract_path(cur_node, parent_idx, start_idx, sparse_poses) \n", + " path[0] = full_start_idx\n", + " path[-1] = full_goal_idx\n", + "\n", + " return path, optimal\n", + " \n", + " def find_local_path(self, start_pose, path, steps=5) :\n", + " set_trace()\n", + "\n", + " path_tree = scipy.spatial.KDTree(self.poses[path])\n", + " path_idx = path_tree.query(start_pose)[1]\n", + " start_idx = self.full_tree.query(self.poses[path[path_idx]])[1]\n", + "\n", + " if path_idx + 5 < len(path) :\n", + " goal_idx =self.full_tree.query(self.poses[path[path_idx + steps]])[1]\n", + " else :\n", + " goal_idx =self.full_tree.query(self.poses[path[-1]])[1]\n", + "\n", + " local_path, _ = self.find_path(start_idx, goal_idx)\n", + "\n", + " return local_path\n", + "\n", + "def total_dist_fun(poses) :\n", + " total_dist = 0\n", + " curr_point = poses[0]\n", + "\n", + " for idx in range(1, poses.shape[0]) :\n", + " total_dist += np.linalg.norm(curr_point - poses[idx])\n", + " curr_point = poses[idx]\n", + " \n", + " return total_dist \n" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "cvOUf76S5zFZ", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#construct A* instance\n", + "astar = Astar(poses)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "Mh_A8aKdvwEt", + "colab_type": "code", + "outputId": "5e84feb2-7cc6-4ab2-a4de-c873f3d7b96f", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 51 + } + }, + "source": [ + "#Test A*\n", + "start_idx = np.random.randint(poses.shape[0])\n", + "goal_idx = np.random.randint(poses.shape[0])\n", + "\n", + "path, optimal = astar.find_path(start_idx, goal_idx, sparseness=10, k=50)" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "8260\n", + "37563\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "CkqvtjJ65x4Y", + "colab_type": "code", + "outputId": "f538c889-e5dd-43a1-b615-213b9161cf75", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 635 + } + }, + "source": [ + "#Plot computed path\n", + "plt.figure(figsize=(16,9))\n", + "plt.scatter(poses[:,1], poses[:,0], s=1)\n", + "plt.scatter(poses[path,1], poses[path,0], c='y', s=20)\n", + "plt.scatter(poses[start_idx,1], poses[start_idx,0], marker='o', c='g', s=500, label='start')\n", + "plt.scatter(poses[goal_idx,1], poses[goal_idx,0], marker='*', c='r', s=750, label='goal')\n", + "plt.legend()\n", + "plt.title('Ground Truth Position of Nodes with Overlaid A* Path')\n", + "plt.xlabel('East (m)')\n", + "plt.ylabel('North (m)')\n", + "plt.axis('equal')" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "(-760.4307177277268,\n", + " 110.22309646812992,\n", + " -360.7839962824075,\n", + " 100.82784220736045)" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 318 + }, + { + "output_type": "display_data", + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "tags": [], + "needs_background": "light" + } + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "CRtQebcnjeSM", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#SE(2) functions\n", + "def matrix_log_SO2(SO2_mat) :\n", + " #ln(R) in SO(3) = theta\n", + " return np.arctan2(SO2_mat[1,0], SO2_mat[0, 0])\n", + "\n", + "def matrix_log_SE2(SE2_mat) :\n", + " theta = matrix_log_SO2(SE2_mat[0:2, 0:2])\n", + " if (theta < 1e-6) :\n", + " A = 1\n", + " B = 0\n", + " else :\n", + " A = np.sin(theta)/theta\n", + " B = (1-np.cos(theta))/theta\n", + "\n", + " v_inv = 1/(A**2 + B**2) * np.array([[A, B], [-B, A]])\n", + " mat_log = np.array(np.matmul(v_inv, SE2_mat[0:2, 2]))\n", + " mat_log = np.append(mat_log, theta)\n", + "\n", + " return mat_log\n", + "\n", + "def matrix_exp_so2(theta) :\n", + " #reconstruct R.\n", + " return np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]])\n", + "\n", + "def matrix_exp_se2(twist) :\n", + " theta = twist[-1]\n", + " R = matrix_exp_so2(theta)\n", + " \n", + " #V converges to I2\n", + " if (theta < 1e-6) :\n", + " V = np.eye(2)\n", + " else:\n", + " V = 1/theta * np.array([[np.sin(theta), -(1 - np.cos(theta))], [(1-np.cos(theta)), np.sin(theta)]])\n", + "\n", + " mat_exp = np.zeros((3,3))\n", + " mat_exp[0:2, 0:2] = R\n", + " mat_exp[0:2, 2] = np.matmul(V, twist[0:2])\n", + " mat_exp[2, 2] = 1\n", + " \n", + " return mat_exp\n", + "\n", + "def get_twist_SE2(Xstart, pos_end, pos_future=None) :\n", + " \n", + " Xend = np.zeros((3,3))\n", + " \n", + " Xend[-1,-1] = 1\n", + " Xend[0:2, 2] = pos_end\n", + " \n", + " #compute end direction (face in direction of future step i.e. end+1)\n", + " if not pos_future is None:\n", + " next_displacement = pos_future - pos_end\n", + " next_theta = np.arctan2(next_displacement[1], next_displacement[0])\n", + " Xend[0:2, 0:2] = np.array([[np.cos(next_theta), -np.sin(next_theta)], [np.sin(next_theta), np.cos(next_theta)]])\n", + " else :\n", + " Xend[0:2, 0:2] = Xstart[0:2, 0:2]\n", + "\n", + " # set_trace()\n", + " twist_SE2 = matrix_log_SE2(np.matmul(np.linalg.inv(Xstart), Xend))\n", + "\n", + " return twist_SE2, Xend\n", + "\n", + "def twist_motion(Xstart, twist, s=1) :\n", + " return np.matmul(Xstart, s * matrix_exp_se2(twist))" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "XkTTjfR_EaGT", + "colab_type": "code", + "outputId": "5aeff481-c2f9-461b-e6e9-85e0f6b4ea12", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 136 + } + }, + "source": [ + "print('testing exponential map for SE2')\n", + "thetas = [0, 1e-4, np.pi/2, np.pi, 15*np.pi/8, 4.5 * np.pi]\n", + "\n", + "for theta in thetas :\n", + " test_SE2 = np.array([[np.cos(theta), -np.sin(theta), 1.5], [np.sin(theta), np.cos(theta), 2], [0, 0, 1]])\n", + " \n", + " twist = matrix_log_SE2(test_SE2)\n", + " SE2_res = matrix_exp_se2(twist)\n", + " \n", + " assert(np.sum(test_SE2 - SE2_res) < 1e-6)\n", + " print('passed theta = ', theta)" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "testing exponential map for SE2\n", + "passed theta = 0\n", + "passed theta = 0.0001\n", + "passed theta = 1.5707963267948966\n", + "passed theta = 3.141592653589793\n", + "passed theta = 5.890486225480862\n", + "passed theta = 14.137166941154069\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "LsDb3F7fKBol", + "colab_type": "code", + "outputId": "fdeaf8f7-a326-4ea7-b495-7fb751b784e0", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 51 + } + }, + "source": [ + "#Generation of twists and executing path\n", + "print('testing motion twist generation')\n", + "\n", + "pos_end = np.array([x1, y1])\n", + "pos_future = np.array([x2, y2])\n", + "\n", + "twists = []\n", + "\n", + "Xstart = np.eye(3)\n", + "Xstart[0:2, 2] = poses[path[0]]\n", + "\n", + "poses = np.array(poses)\n", + "\n", + "for pose_idx, path_idx in enumerate(path[1:-1]) :\n", + " twist, Xstart = get_twist_SE2(Xstart, poses[pose_idx], poses[path_idx + 1])\n", + " twists.append(twist)\n", + " #print(twist)\n", + "\n", + "twist, Xend = get_twist_SE2(Xstart, poses[-1])\n", + "twists.append(twist)\n", + "\n", + "\n", + "Xk = np.eye(3)\n", + "Xk[0:2, 2] = poses[path[0]]\n", + "\n", + "for twist in twists :\n", + " Xk = twist_motion(Xk, twist)\n", + "\n", + "assert(np.sum(Xk - Xend) < 1e-6)\n", + "print('passed')" + ], + "execution_count": 359, + "outputs": [ + { + "output_type": "stream", + "text": [ + "testing motion twist generation\n", + "passed\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "PaYwXvSfeO4k", + "colab_type": "code", + "outputId": "e53f3ada-f8bd-4ec8-ffd2-124154658d74", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 985 + } + }, + "source": [ + "#Cubic interpolation of poses.\n", + "\n", + "poss = np.array(poses[path])\n", + "velocities = np.zeros(poss.shape) \n", + "T = np.zeros(poss.shape[0])\n", + "\n", + "total_time = 100\n", + "total_dist = total_dist_fun(poss)\n", + "cum_dist = 0\n", + "\n", + "velocities[0] = 0\n", + "\n", + "for i in range(1, poss.shape[0] - 1) :\n", + " seg_dist = np.linalg.norm(poss[i+1] - poss[i])\n", + " velocities[i] = (((poss[i+1] - poss[i]) / seg_dist) + velocities[i-1])/2\n", + " T[i] = total_time * cum_dist/total_dist\n", + " cum_dist += seg_dist\n", + " \n", + "T[-1] = total_time\n", + "velocities[-1] = 0\n", + "print(velocities)\n", + "\n", + "a = np.zeros((poss.shape[0], 4, poss.shape[1]))\n", + "\n", + "for j in range(0, poss.shape[0]-1) :\n", + " del_Tj = T[j+1] - T[j]\n", + " a[j, 0] = poss[j]\n", + " a[j, 1] = velocities[j]\n", + " a[j, 2] = (3 * poss[j+1] - 3 * poss[j] - 2 * velocities[j] * del_Tj - velocities[j+1] * del_Tj)/ (del_Tj**2)\n", + " a[j, 3] = (2 * poss[j] + (velocities[j] + velocities[j+1]) * del_Tj - 2 * poss[j + 1]) / (del_Tj**3)\n", + "\n", + "del_t = 0.005\n", + "pos_x = [a[0,0][0]]\n", + "pos_y = [a[0,0][1]]\n", + "vel_x = [0]\n", + "vel_y = [0]\n", + "\n", + "total_trial = 100\n", + "\n", + "for t in np.arange(del_t, total_trial, del_t) :\n", + " j = np.argmax(T > t)-1\n", + " delta_t = t - T[j]\n", + " pos_t = a[j, 0] + a[j, 1]* delta_t + a[j, 2] * (delta_t**2) + a[j, 3] * (delta_t**3)\n", + " pos_x.append(pos_t[0])\n", + " pos_y.append(pos_t[1])\n", + " vel_x.append((pos_x[-1] - pos_x[-2])/del_t)\n", + " vel_y.append((pos_y[-1] - pos_y[-2])/del_t)\n", + "\n", + "t = np.arange(0, total_trial, del_t)\n", + "\n", + "plt.figure(figsize=(16,9))\n", + "plt.plot(t[1:405], pos_x[1:405], linestyle='-', c='r', label='interpolated x position')\n", + "# plt.scatter(t[0:400], pos_y[0:400], label='y position')\n", + "\n", + "plt.scatter(T[1:14], poss[1:14,0], c='b')\n", + "plt.plot(T[1:14], poss[1:14,0], linestyle='-', c='g', label='no interpolation')\n", + "# plt.scatter(T[0:10], poss[0:10,1], label='y no interp')\n", + "\n", + "plt.legend()\n", + "plt.title('position with cubic interpolation of via points')\n", + "plt.xlabel('Time (s)')\n", + "plt.ylabel('Position (m)')\n", + "\n", + "\n", + "plt.figure()\n", + "plt.scatter(t[2:500], vel_x[2:500])\n", + "plt.scatter(t[2:500], vel_y[2:500])" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "[[0. 0. ]\n", + " [0.06225802 0.4961088 ]\n", + " [0.09777187 0.74359323]\n", + " ...\n", + " [0.61079301 0.79024087]\n", + " [0.64382541 0.76317733]\n", + " [0. 0. ]]\n" + ], + "name": "stdout" + }, + { + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:27: RuntimeWarning: divide by zero encountered in true_divide\n", + "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:28: RuntimeWarning: divide by zero encountered in true_divide\n" + ], + "name": "stderr" + }, + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 356 + }, + { + "output_type": "display_data", + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "tags": [], + "needs_background": "light" + } + }, + { + "output_type": "display_data", + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "tags": [], + "needs_background": "light" + } + } + ] + } + ] +} \ No newline at end of file