from IPython.display import Latex
import matplotlib.pyplot as plt
%matplotlib inline
import numpy
import math
import scipy.misc
from numpy.linalg import pinv
from numpy.linalg import inv
from scipy.stats import norm
from scipy import integrate
from scipy.integrate import odeint
from IPython.display import Image
The point of this homework is for you to get familiar with putting together filters of various types. The point is not to get buried in tricky dynamics models or measurement models, but to get familiar with building and diagnosing filters so that you can apply them to more complex systems. As such, we'll use an easily differentiable dynamics and measurement model, and I will provide you with the time history of measurements.
Use whatever programming language you're most comfortable with.
Dynamics, measurements, and initial estimates under consideration:
where
where
You all don't need to recreate this, just import the data from the files.
# Commented out so that I don't
# accidentally run this cell and
# make a new trajectory
##########################
# xhat0 = 4.
# Phat0 = 2.
# Q = 1.
# R = 0.25
# N = 100
# def createTrajectory(xhat0, P0, Q, R, t):
# x0 = numpy.sqrt(P0)*numpy.random.randn()
# x_array = numpy.zeros(N)
# meas_array = numpy.zeros(N)
# x_prev = x0
# count = 0
# file = open('trajectory.txt','w')
# file2 = open('measurements.txt', 'w')
# for i in range(N):
# x_array[i] = (expectedUpdate(x_prev, i) +
# numpy.sqrt(Q)*numpy.random.randn())
# file.write(str(x_array[i])+'\n')
# meas_array[i] = (expectedMeasurement(x_array[i]) +
# numpy.sqrt(R)*numpy.random.randn())
# file2.write(str(meas_array[i])+'\n')
# x_prev = x_array[i]
# file.close()
# file2.close
# return x_array, meas_array
# traj = createTrajectory(xhat0, Phat0, Q, R, t)
# plt.plot(traj[0], 'r.')
# plt.title('Trajectory');plt.xlabel('Sample Number')
# plt.show()
# plt.plot(traj[1], 'b.')
# plt.title('Measurement');plt.xlabel('Sample Number')
# plt.show()
Import the measurements and the true trajectory.
with open("measurements.txt") as f:
measurements = map(float, f)
with open("trajectory.txt") as f:
true_trajectory = map(float, f)
Expected State Update:
def expectedUpdate(x, k):
return 2*numpy.arctan(x) + .5*numpy.cos((numpy.pi/3.)*k)
Expected Measurement Update:
def expectedMeasurement(x):
return x + x**2. + x**3.
Ns=1000
Nt=100
P0 = numpy.array([[2.]])
x0 = numpy.array([4.])
Q = numpy.array([[1.]])
R = numpy.array([[.25]])
def resetWeights():
ws = numpy.ones(Ns)*(1./Ns)
logweights = numpy.log(ws)
return ws, logweights
def resetParticles(x, P):
particles_k = numpy.random.multivariate_normal(x, P, Ns)
return particles_k
def resetNoise():
noise_k = numpy.random.multivariate_normal(numpy.zeros(1), Q, Ns)
return noise_k
def propagateParticles(particles, noise, k):
new_particles = numpy.zeros((Ns, 1))
for i in range(Ns):
new_particles[i,:] = expectedUpdate(particles[i,:][0],k) + noise[i,:][0]
return new_particles
def updateWeights(index, new_particles, logweights):
measurement = measurements[index]
for i in range(Ns):
logweights[i] = (-.5*(measurement - expectedMeasurement(new_particles[i,:]))*(1./R)*
(measurement - expectedMeasurement(new_particles[i,:]))
+ logweights[i])
maxlog = max(logweights)
intermediate_weights = numpy.exp(logweights - maxlog)
new_weights = intermediate_weights/(numpy.sum(intermediate_weights))
return logweights, new_weights
def computeEstimate(weights, particles):
x_new = numpy.zeros(1)
P_new = numpy.zeros((1,1))
for i in range(Ns):
x_new += weights[i]*particles[i,:]
for i in range(Ns):
diff = numpy.array([(particles[i,:] - x_new)])
P_new += weights[i]*numpy.dot(diff.T, diff)
return x_new, P_new
def effectiveParticles(weights):
return 1/(numpy.sum(weights*weights))
def runParticleFilter():
P = P0
x_start = x0
ws, logweights = resetWeights()
particles_k = resetParticles(x_start, P)
noise = resetNoise()
Neff = Ns
P_array = numpy.zeros((N, 1, 1))
x_array = numpy.zeros((N, 1))
P_array[0] = P
x_array[0,:] = x_start
for i in range(N):
new_particles = propagateParticles(particles_k, noise, i)
logweights, ws = updateWeights(i, new_particles, logweights)
x_new, P_new = computeEstimate(ws, new_particles)
Neff = effectiveParticles(ws)
P_array[i] = P_new
x_array[i,:] = x_new
if Neff < Nt:
particles_k = resetParticles(x_new, P_new)
noise = resetNoise()
ws, logweights = resetWeights()
else:
particles_k = new_particles
noise = resetNoise()
return x_array, P_array
pfresults = runParticleFilter()
pf_traj, pf_cov = pfresults
plt.plot(pf_traj[:,0], 'r.', label='PF Estimates')
plt.plot(true_trajectory, 'b.', alpha=0.5, label='True Trajectory')
plt.title('Particle Filter vs Truth')
plt.xlabel('Sample Number')
plt.legend(bbox_to_anchor=(1.04,1), loc="upper left")
plt.show()
plt.plot(true_trajectory - pf_traj[:,0], 'r.', label='Error')
plt.plot(2*numpy.sqrt(pf_cov[:,0,0]), 'g-', label='2 $\sigma$ Covariance')
plt.plot(-2*numpy.sqrt(pf_cov[:,0,0]), 'g-')
plt.title('Particle Filter Error')
plt.xlabel('Sample Number')
plt.legend(bbox_to_anchor=(1.04,1), loc="upper left")
plt.show()
P0 = numpy.array([[2.]])
x0 = numpy.array([4.])
Q = numpy.array([[1.]])
R = numpy.array([[.25]])
alpha = 1.e-4
nx = 1
nv = 1
kappa=3-nx
lam = alpha**2.*(nx+kappa) - nx
radical = numpy.sqrt(nx+nv+lam)
beta = 2.
def getCholesky(P, Q):
return numpy.linalg.cholesky(P), numpy.linalg.cholesky(Q)
def getColumn(mat, colnum):
return mat[:,colnum]
def formSigmas(xhat, Sx, Sv):
sigmas = numpy.zeros((1+2*(nx+nv), 1))
noises = numpy.zeros((1+2*(nx+nv), 1))
sigmas[0,:] = xhat
noises[0,:] = numpy.zeros(1)
for i in range(nx):
sigmas[i+1,:] = xhat + radical*getColumn(Sx, i)
noises[i+1,:] = numpy.zeros(1)
for i in range(nx):
sigmas[i+1+nx,:] = xhat - radical*getColumn(Sx, i)
noises[i+1+nx,:] = numpy.zeros(1)
for i in range(nv):
sigmas[i+1+nx+nx,:] = xhat
noises[i+1+nx+nx,:] = radical*getColumn(Sv, i)
for i in range(nv):
sigmas[i+1+nx+nx+nv,:] = xhat
noises[i+1+nx+nx+nv,:] = -radical*getColumn(Sv, i)
return sigmas, noises
def propagateSigmas(sigmas, noise, k):
new_sigmas = numpy.zeros((len(sigmas), 1))
for i in range(len(sigmas)):
new_sigmas[i,:] = expectedUpdate(sigmas[i,:], k) + noise[i,:]
return new_sigmas
def sigmaMeasurements(sigmas):
measurements = numpy.zeros(len(sigmas))
for i in range(len(sigmas)):
measurements[i] = expectedMeasurement(sigmas[i,:])
return measurements
A. Weights
B. Weighted Averages
def getMeans(sigmas, measurements):
center_weight = lam/(nx+nv+lam)
other_weight = 1./(2*(nx+nv+lam))
x_mean = center_weight * sigmas[0,:]
z_mean = center_weight * measurements[0]
for i in range(1,len(sigmas)):
x_mean += other_weight * sigmas[i,:]
z_mean += other_weight * measurements[i]
return x_mean, z_mean
A. Weights
B. Weighted Averages
def computeCovariances(xmean, zmean, sigmas, measurements):
center_weight = (lam/(nx+nv+lam)) + 1 - alpha**2. + beta
other_weight = 1./(2*(nx+nv+lam))
Pxx = center_weight * numpy.dot(numpy.array([sigmas[0,:] - xmean]).T,
numpy.array([sigmas[0,:] - xmean]))
Pxz = center_weight * numpy.dot(numpy.array([sigmas[0,:] - xmean]).T,
numpy.array([[measurements[0] - zmean]]))
Pzz = center_weight * numpy.dot(numpy.array([[measurements[0] - zmean]]).T,
numpy.array([[measurements[0] - zmean]]))
for i in range(1, len(sigmas)):
Pxx += other_weight * numpy.dot(numpy.array([sigmas[i,:] - xmean]).T,
numpy.array([sigmas[i,:] - xmean]))
Pxz += other_weight * numpy.dot(numpy.array([sigmas[i,:] - xmean]).T,
numpy.array([[measurements[i] - zmean]]))
Pzz += other_weight * numpy.dot(numpy.array([[measurements[i] - zmean]]).T,
numpy.array([[measurements[i] - zmean]]))
return Pxx, Pxz, Pzz
A. Kalman Gain
B. Updates
def updateEstimate(xmean, Pxx, Pxz, Pzz, zmean, time_index):
xnew = numpy.array([xmean]).T + numpy.dot(Pxz, pinv(Pzz))*(measurements[time_index] - zmean)
xnew = xnew.T[0]
K = numpy.dot(Pxz, pinv(Pzz))
Pnew = Pxx - numpy.dot(K, numpy.dot(R, K.T))
return xnew, Pnew
def runUKF():
P = P0
x = x0
P_array = numpy.zeros((N, 1, 1))
x_array = numpy.zeros((N, 1))
P_array[0] = P
x_array[0,:] = x_start
for i in range(N):
Sx, Sv = getCholesky(P, Q)
sigmas, noises = formSigmas(x, Sx, Sv)
propagated_sigmas = propagateSigmas(sigmas, noises, i)
expected_measurements = sigmaMeasurements(propagated_sigmas)
xmean, zmean = getMeans(propagated_sigmas, expected_measurements)
Pxx, Pxz, Pzz = computeCovariances(xmean, zmean, propagated_sigmas,
expected_measurements)
xnew, Pnew = updateEstimate(xmean, Pxx, Pxz, Pzz, zmean, i)
P_array[i] = Pnew
x_array[i,:] = xnew
P = Pnew
x = xnew
return x_array, P_array
ukfresults = runUKF()
ukf_traj, ukf_cov = ukfresults
plt.plot(ukf_traj[:,0], 'r.', label='ukf Estimates')
plt.plot(true_trajectory, 'b.', alpha=0.5, label='True Trajectory')
plt.title('Unscented Kalman Filter vs Truth')
plt.xlabel('Sample Number')
plt.legend(bbox_to_anchor=(1.04,1), loc="upper left")
plt.show()
plt.plot(true_trajectory - ukf_traj[:,0], 'r.', label='Error')
plt.plot(2*numpy.sqrt(ukf_cov[:,0,0]), 'g-', label='2 $\sigma$ Covariance')
plt.plot(-2*numpy.sqrt(ukf_cov[:,0,0]), 'g-')
plt.title('Unscented Kalman Filter Error')
plt.xlabel('Sample Number')
plt.legend(bbox_to_anchor=(1.04,1), loc="upper left")
plt.show()
P0 = numpy.array([[2.]])
x0 = numpy.array([4.])
Q = numpy.array([[1.]])
R = numpy.array([[.25]])
def propagateEstimate(state, k):
return expectedUpdate(state, k)
def getF(state):
return numpy.array([[2./(1 + state[0]**2.)]])
def getH(state):
return numpy.array([[1. + 2.*state[0] + 3.*state[0]**2.]])
def propagateCovariance(P, F):
return numpy.dot(F, numpy.dot(P, F.T)) + Q
def getExpectedMeasurement(state):
return expectedMeasurement(state)
def propagatePzz(H, P):
return numpy.dot(H, numpy.dot(P, H.T)) + R
def propagatePxz(H, P):
return numpy.dot(P, H.T)
def getK(Pxz, Pzz):
return numpy.dot(Pxz, pinv(Pzz))
def newEstimate(xhat, K, zexp, time_index):
out = xhat + numpy.dot(K, numpy.array([[measurements[time_index]-zexp]]))[0]
return out.T[0]
def newCovariance(P, K, Pzz):
return P - numpy.dot(K, numpy.dot(Pzz, K.T))
def runEKF():
P = P0
x = x_start
P_array = numpy.zeros((N, 1, 1))
x_array = numpy.zeros((N, 1))
P_array[0] = P
x_array[0,:] = x_start
for i in range(N):
x_plus = propagateEstimate(x, i)
F = getF(x_plus)
H = getH(x_plus)
P_plus = propagateCovariance(P, F)
z_plus = getExpectedMeasurement(x_plus)
Pzz_plus = propagatePzz(H, P_plus)
Pxz_plus = propagatePxz(H, P_plus)
K = getK(Pxz_plus, Pzz_plus)
xnew = newEstimate(x_plus, K, z_plus, i)
Pnew = newCovariance(P_plus, K, Pzz_plus)
P_array[i] = Pnew
x_array[i,:] = xnew
P = Pnew
x = xnew
return x_array, P_array
ekfresults = runEKF()
ekf_traj, ekf_cov = ekfresults
plt.plot(ekf_traj[:,0], 'r.', label='ekf Estimates')
plt.plot(true_trajectory, 'b.', alpha=0.5, label='True Trajectory')
plt.title('Extended Kalman Filter vs Truth')
plt.xlabel('Sample Number')
plt.legend(bbox_to_anchor=(1.04,1), loc="upper left")
plt.show()
plt.plot(true_trajectory - ekf_traj[:,0], 'r.', label='Error')
plt.plot(2*numpy.sqrt(ekf_cov[:,0,0]), 'g-', label='2 $\sigma$ Covariance')
plt.plot(-2*numpy.sqrt(ekf_cov[:,0,0]), 'g-')
plt.title('Extended Kalman Filter Error')
plt.xlabel('Sample Number')
plt.legend(bbox_to_anchor=(1.04,1), loc="upper left")
plt.show()
The EKF propagates the error covariance estimate using a Jacobian. This is a first-order approximation of the nonlinear dynamics function. In our case, the dynamics are very nonlinear, so the first-order approximation does not do a good job estimating the true error covariance.
The particle filter did the best, but it is way more computationally intensive than either of the other filters.
For a linear system, all filters would perform approximately as well as one another. In that case, we would be foolish to choose a particle filter because it's extra computation for no improvement in accuracy.