mirror of
https://github.com/Mobile-Robotics-W20-Team-9/polex.git
synced 2025-09-07 21:23:13 +00:00
86 lines
3.3 KiB
Python
86 lines
3.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
import warnings
|
|
|
|
import numpy as np
|
|
import scipy
|
|
|
|
import util
|
|
|
|
|
|
class particlefilter:
|
|
def __init__(self, count, start, posrange, angrange,
|
|
polemeans, polevar, T_w_o=np.identity(4)):
|
|
self.p_min = 0.01
|
|
self.d_max = np.sqrt(-2.0 * polevar * np.log(
|
|
np.sqrt(2.0 * np.pi * polevar) * self.p_min))
|
|
self.minneff = 0.5
|
|
self.estimatetype = 'best'
|
|
self.count = count
|
|
r = np.random.uniform(low=0.0, high=posrange, size=[self.count, 1])
|
|
angle = np.random.uniform(low=-np.pi, high=np.pi, size=[self.count, 1])
|
|
xy = r * np.hstack([np.cos(angle), np.sin(angle)])
|
|
dxyp = np.hstack([xy, np.random.uniform(
|
|
low=-angrange, high=angrange, size=[self.count, 1])])
|
|
self.particles = np.matmul(start, util.xyp2ht(dxyp))
|
|
self.weights = np.full(self.count, 1.0 / self.count)
|
|
self.polemeans = polemeans
|
|
self.poledist = scipy.stats.norm(loc=0.0, scale=np.sqrt(polevar))
|
|
self.kdtree = scipy.spatial.cKDTree(polemeans[:, :2], leafsize=3)
|
|
self.T_w_o = T_w_o
|
|
self.T_o_w = util.invert_ht(self.T_w_o)
|
|
|
|
@property
|
|
def neff(self):
|
|
return 1.0 / (np.sum(self.weights**2.0) * self.count)
|
|
|
|
def update_motion(self, mean, cov):
|
|
T_r0_r1 = util.xyp2ht(
|
|
np.random.multivariate_normal(mean, cov, self.count))
|
|
self.particles = np.matmul(self.particles, T_r0_r1)
|
|
|
|
def update_measurement(self, poleparams, resample=True):
|
|
n = poleparams.shape[0]
|
|
polepos_r = np.hstack(
|
|
[poleparams[:, :2], np.zeros([n, 1]), np.ones([n, 1])]).T
|
|
for i in range(self.count):
|
|
polepos_w = self.particles[i].dot(polepos_r)
|
|
d, _ = self.kdtree.query(
|
|
polepos_w[:2].T, k=1, distance_upper_bound=self.d_max)
|
|
self.weights[i] *= np.prod(
|
|
self.poledist.pdf(np.clip(d, 0.0, self.d_max)) + 0.1)
|
|
self.weights /= np.sum(self.weights)
|
|
|
|
if resample and self.neff < self.minneff:
|
|
self.resample()
|
|
|
|
def estimate_pose(self):
|
|
if self.estimatetype == 'mean':
|
|
xyp = util.ht2xyp(np.matmul(self.T_o_w, self.particles))
|
|
mean = np.hstack(
|
|
[np.average(xyp[:, :2], axis=0, weights=self.weights),
|
|
util.average_angles(xyp[:, 2], weights=self.weights)])
|
|
return self.T_w_o.dot(util.xyp2ht(mean))
|
|
if self.estimatetype == 'max':
|
|
return self.particles[np.argmax(self.weights)]
|
|
if self.estimatetype == 'best':
|
|
i = np.argsort(self.weights)[-int(0.1 * self.count):]
|
|
xyp = util.ht2xyp(np.matmul(self.T_o_w, self.particles[i]))
|
|
mean = np.hstack(
|
|
[np.average(xyp[:, :2], axis=0, weights=self.weights[i]),
|
|
util.average_angles(xyp[:, 2], weights=self.weights[i])])
|
|
return self.T_w_o.dot(util.xyp2ht(mean))
|
|
|
|
def resample(self):
|
|
cumsum = np.cumsum(self.weights)
|
|
pos = np.random.rand() / self.count
|
|
idx = np.empty(self.count, dtype=np.int)
|
|
ics = 0
|
|
for i in range(self.count):
|
|
while cumsum[ics] < pos:
|
|
ics += 1
|
|
idx[i] = ics
|
|
pos += 1.0 / self.count
|
|
self.particles = self.particles[idx]
|
|
self.weights[:] = 1.0 / self.count
|