Classes - Beard

You are here: start » robot_soccer » 2014 » groups » motion_control_group


|

Meta

Motion Control Group Wiki

Kalman Resource: http://greg.czerniak.info/guides/kalman1/

Matlab to NumPy page: http://wiki.scipy.org/NumPy_for_Matlab_Users

Kalman Filter Implementation:

feel free to email me (Steven) at stevmm@gmail.com if you have questions.

#!/usr/bin/env python 

import numpy
import numpy.linalg as LA
import math

import rospy

from critical_path.msg import OutputFromVision
from critical_path.msg import OutputFromEncoders

def listener():
    filter = KalmanFilter()
    
    rospy.init_node('stateEstimator', anonymous=True)
    
    rospy.Subscriber('vision', OutputFromVision, filter.vision_callback)
    rospy.Subscriber('encoders', OutputFromEncoders, filter.utility_observer)

    rospy.spin()

class KalmanFilter:
    def __init__(self):
        l = 3.048
        w = 1.52
        pi = 3.14156
        self.S0_self = numpy.diag([l*10/12, w*10/12, pi*10/2])
        self.Q_self = numpy.diag([100,100,100])
        self.R_range = .1
        self.R_bearing = 3.14/180
        self.S0_ball = numpy.diag([1,1,1,1,1])
        self.Q_ball = numpy.diag([.010,.010,10,10,.1])
        self.state_self = numpy.array([0, 0, pi/2])#used to be at -l/6, 0, 0
        self.state_S_self = self.S0_self
        self.state_ball = numpy.array([0,0,0,0,.3])
        self.state_S_ball = self.S0_ball
        self.markers = numpy.array([[-l/2,0,l/2,l/2,0,-l/2],[w/2,w/2,w/2,-w/2,-w/2,-w/2]])
        self.control_sample_rate = .1  #check this again?#TODO debate on using dynamic vs constant sample rate
            #and getting the timestamp from ROS
        #self.last_time_called = system.get_time()
        self.OUT_OF_RANGE = -1000
        self.vision = {'marker' : [[self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE], [self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE]], 'ball' : [self.OUT_OF_RANGE, self.OUT_OF_RANGE]}

    def print_self(self):
        print "Current robot position"
        print self.state_self
        print "Current ball position"
        print self.state_ball
    
    def vision_callback(self, data):
        self.vision = {'marker' : [[data.pillar1_range, data.pillar2_range, data.pillar3_range, data.pillar4_range, data.pillar5_range, data.pillar6_range], [data.pillar1_bearing, data.pillar2_bearing, data.pillar3_bearing, data.pillar4_bearing, data.pillar5_bearing, data.pillar6_bearing]], 'ball' : [data.ball_range, data.ball_bearing]}

    #def utility_observer(self, reset_flag=0,v1=numpy.array([0,0,0]),vision={'marker' : [[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12]], 'ball' : [13, 14]}):


    def utility_observer(self, data, reset_flag=0):
        #get speeds that encoders sent
        v1 = numpy.array([0,0,0]) #numpy.array([data.xdot, data.ydot, data.thetadot])
        
    
        if reset_flag == 1:
              self.state_self = numpy.array([-3.048/6,0,0])
              self.state_S_self = S0_self
              self.state_ball = numpy.array([0,0,0,0,.3])
              self.state_S_ball = S0_ball
    #----------------------
    # estimate self position and heading
        self.utility_observer_self_update(
            state_self=self.state_self,
            state_S_self=self.state_S_self,
            vision=self.vision,
            v1=v1)
    #----------------------
    # estimate ball position and velocity
        self.utility_observer_ball_update(
            state_ball=self.state_ball, 
            state_S_ball=self.state_S_ball, 
            vision=self.vision,
            state_self=self.state_self,
            state_S_self=self.state_S_self)
            
        self.vision = {'marker' : [[self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE], [self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE, self.OUT_OF_RANGE]], 'ball' : [self.OUT_OF_RANGE, self.OUT_OF_RANGE]}
        
        #print 'state', self.state_self
        rospy.loginfo(self.state_self)


    def utility_observer_self_update(self, state_self, state_S_self, vision, v1):

        # estimate between measurements
        print 'estimate between measurements'
        N = 10
        for i in range(0,N):
            f = v1
            state_self = state_self + (self.control_sample_rate / N) * f
            state_S_self = state_S_self + (self.control_sample_rate / N) * (self.Q_self)

        # measurement updates
        for i in range(0,6):
            # range measurement
            if vision['marker'][0][i] != self.OUT_OF_RANGE:
                print 'range measurement to piller', i+1
                rho = self.markers[:, i] - state_self[0:2]
                Rho = LA.norm(rho)
                if Rho > 0.05:            # this is a hack, but avoids dividing by zero when Rho is small
                    h = Rho
                    C = numpy.array([-rho[0], -rho[1], 0]) / Rho
                    L = numpy.squeeze(numpy.asarray(state_S_self.dot(C.transpose()) / (self.R_range + C.dot(state_S_self).dot(C.transpose()))))
                    
                    
                    state_S_self = (numpy.diag([1,1,1]) - numpy.asmatrix(L).transpose()*C)*(state_S_self)
            #print "l: %s" % L
            #print "vision['marker']: %s" % vision['marker']
            #print "vision['marker'][1]: %s" % vision['marker'][1]
            #print "vision['marker'][1][i]: %s" % vision['marker'][1][i]
            #print "other: %s" % vision['marker'][1][i] -h
                    #print state_self
                    #print L
                    #print vision['marker'][0][i]
                    #print h
                    state_self = state_self + L*(vision['marker'][0][i] - h)
            
            # bearing measurement
            if vision['marker'][1][i] != self.OUT_OF_RANGE:
                print 'bearing measurement from piller:', i+1
                rho = self.markers[:, i] - state_self[0:2]
                Rho = LA.norm(rho)
                if Rho > 0.05:            # this is a hack, but avoids dividing by zero when Rho is small
                    phi = state_self[2]
                    h = math.asin((rho[1] * math.cos(phi) - rho[0] * math.sin(phi)) / Rho)
                    C = numpy.sign(rho[0] * math.cos(phi) + rho[1] * math.sin(phi)) * numpy.array([rho[1] / (Rho ** 2), -rho[0] / (Rho ** 2), -1])
                    L = numpy.squeeze(numpy.asarray(state_S_self.dot(C.transpose()) / (self.R_bearing + C.dot(state_S_self).dot(C.transpose()))))
                    state_S_self = (numpy.diag([1,1,1]) - numpy.asmatrix(L).transpose()*C)*(state_S_self)
                    state_self = state_self + L*(vision['marker'][1][i] - h)
        
        self.state_self = state_self
        self.state_S_self = state_S_self

    def utility_observer_ball_update(self,state_ball, state_S_ball, vision, state_self, state_S_self):
        # re-initialize ball if valid vision data, and ball_hat is behind robot
        if numpy.logical_and(((state_ball[0:2] - state_self[0:2]).transpose().dot(numpy.array([math.cos(state_self[2]), math.sin(state_self[2])])) < 0), (numpy.logical_or((vision['ball'][0] != self.OUT_OF_RANGE), (vision['ball'][1] != self.OUT_OF_RANGE)))):
            state_ball[0:2] = state_self[0:2] + vision['ball'][0]*numpy.array([math.cos(state_self[2] + vision['ball'][1]), math.sin(state_self[2] + vision['ball'][1])])
            rho = state_ball[0:2] - state_self[0:2]
            Rho = LA.norm(rho)
            phi = state_self[2]
            beta = math.asin((rho[1] * math.cos(phi) - rho[0] * math.sin(phi)) / Rho)
            R = numpy.array([[math.cos(2 * beta), -math.sin(2 * beta)], [math.sin(2 * beta), math.cos(2 * beta)]])
            state_ball[2:4] = R.dot(state_ball[2:4])
            state_S_ball = numpy.diag([1, 1, 1, 1, 1])

        # estimate between measurements
        N = 10
        for i in range(0,N):
            # right hand side of differential equation for ball
            # doesn't include bounces off wall (or self)
            f = numpy.array([state_ball[2],state_ball[3], -state_ball[4] * state_ball[2],-state_ball[4]*state_ball[3],  0]).transpose()
            state_ball = state_ball + (self.control_sample_rate / N)*f
            #A = numpy.array([[numpy.zeros((2, 2)), numpy.diag([1,1]), numpy.zeros((2, 1))], [numpy.zeros((2, 2)), -state_ball[4] * numpy.diag([1,1]), -state_ball[2:4]], [numpy.zeros((1, 2)), numpy.zeros((1, 2)), 0]])
            A = numpy.array([[0,0,1,0,0],
                             [0,0,0,1,0],
                             [0,0,-state_ball[4],0,-state_ball[2]],
                             [0,0,0,-state_ball[4],-state_ball[3]],
                             [0,0,0,0,0]])
            state_S_ball = state_S_ball + (self.control_sample_rate / N) * (A.dot(state_S_ball) + state_S_ball.dot(A.transpose()) + self.Q_ball)
        
        
        # measurement updates
        # range measurement
        if vision['ball'][0] != self.OUT_OF_RANGE:
            rho = state_ball[0:2] - state_self[0:2]
            Rho = LA.norm(rho)
            if Rho > .05:        # this is a hack, but avoids dividing by zero when Rho is small
                h = Rho
                C = numpy.array([rho[0], rho[1], 0, 0, 0]) / Rho
                L = numpy.squeeze(numpy.asarray(state_S_ball.dot(C.transpose()) / (self.R_range + C.dot(state_S_ball).dot(C.transpose()))))
                state_S_ball = (numpy.diag([1,1,1,1,1]) - numpy.asmatrix(L).transpose()*C)*(state_S_ball)
                state_ball = state_ball + L * (vision['ball'][0] - h)
        
        # bearing measurement
        if vision['ball'][1] != self.OUT_OF_RANGE:
            rho = state_ball[0:2] - state_self[0:2]
            Rho = LA.norm(rho)
            if Rho > .05:        # this is a hack, but avoids dividing by zero when Rho is small
                phi = state_self[2]
                h = math.asin((rho[1] * math.cos(phi) - rho[0] * math.sin(phi)) / Rho)
                C = numpy.sign(rho[0] * math.cos(phi) + rho[1] * math.sin(phi)) * numpy.array([(-rho[1] / (Rho ** 2)), (rho[0] / (Rho ** 2)), 0, 0, 0])
                L = numpy.squeeze(numpy.asarray(state_S_ball.dot(C.transpose()) / (self.R_bearing + C.dot(state_S_ball).dot(C.transpose()))))
                state_S_ball = (numpy.diag([1,1,1,1,1]) - numpy.asmatrix(L).transpose()*C)*(state_S_ball)
                state_ball = state_ball + L * (vision['ball'][1] - h)
        
        self.state_ball = state_ball
        self.state_S_ball = state_S_ball

if __name__ == "__main__":
    #k = KalmanFilter()
    listener()