Files
polex/poles/particlefilter.py
Alexander Schaefer 17877ef198 Add code.
2019-07-25 10:28:19 +02:00

86 lines
3.3 KiB
Python

#!/usr/bin/env python
import warnings
import numpy as np
import scipy
import util
class particlefilter:
def __init__(self, count, start, posrange, angrange,
polemeans, polevar, T_w_o=np.identity(4)):
self.p_min = 0.01
self.d_max = np.sqrt(-2.0 * polevar * np.log(
np.sqrt(2.0 * np.pi * polevar) * self.p_min))
self.minneff = 0.5
self.estimatetype = 'best'
self.count = count
r = np.random.uniform(low=0.0, high=posrange, size=[self.count, 1])
angle = np.random.uniform(low=-np.pi, high=np.pi, size=[self.count, 1])
xy = r * np.hstack([np.cos(angle), np.sin(angle)])
dxyp = np.hstack([xy, np.random.uniform(
low=-angrange, high=angrange, size=[self.count, 1])])
self.particles = np.matmul(start, util.xyp2ht(dxyp))
self.weights = np.full(self.count, 1.0 / self.count)
self.polemeans = polemeans
self.poledist = scipy.stats.norm(loc=0.0, scale=np.sqrt(polevar))
self.kdtree = scipy.spatial.cKDTree(polemeans[:, :2], leafsize=3)
self.T_w_o = T_w_o
self.T_o_w = util.invert_ht(self.T_w_o)
@property
def neff(self):
return 1.0 / (np.sum(self.weights**2.0) * self.count)
def update_motion(self, mean, cov):
T_r0_r1 = util.xyp2ht(
np.random.multivariate_normal(mean, cov, self.count))
self.particles = np.matmul(self.particles, T_r0_r1)
def update_measurement(self, poleparams, resample=True):
n = poleparams.shape[0]
polepos_r = np.hstack(
[poleparams[:, :2], np.zeros([n, 1]), np.ones([n, 1])]).T
for i in range(self.count):
polepos_w = self.particles[i].dot(polepos_r)
d, _ = self.kdtree.query(
polepos_w[:2].T, k=1, distance_upper_bound=self.d_max)
self.weights[i] *= np.prod(
self.poledist.pdf(np.clip(d, 0.0, self.d_max)) + 0.1)
self.weights /= np.sum(self.weights)
if resample and self.neff < self.minneff:
self.resample()
def estimate_pose(self):
if self.estimatetype == 'mean':
xyp = util.ht2xyp(np.matmul(self.T_o_w, self.particles))
mean = np.hstack(
[np.average(xyp[:, :2], axis=0, weights=self.weights),
util.average_angles(xyp[:, 2], weights=self.weights)])
return self.T_w_o.dot(util.xyp2ht(mean))
if self.estimatetype == 'max':
return self.particles[np.argmax(self.weights)]
if self.estimatetype == 'best':
i = np.argsort(self.weights)[-int(0.1 * self.count):]
xyp = util.ht2xyp(np.matmul(self.T_o_w, self.particles[i]))
mean = np.hstack(
[np.average(xyp[:, :2], axis=0, weights=self.weights[i]),
util.average_angles(xyp[:, 2], weights=self.weights[i])])
return self.T_w_o.dot(util.xyp2ht(mean))
def resample(self):
cumsum = np.cumsum(self.weights)
pos = np.random.rand() / self.count
idx = np.empty(self.count, dtype=np.int)
ics = 0
for i in range(self.count):
while cumsum[ics] < pos:
ics += 1
idx[i] = ics
pos += 1.0 / self.count
self.particles = self.particles[idx]
self.weights[:] = 1.0 / self.count