Lucas-Kanade tracker

This is an implementation of the (inverse compositional) Lucas-Kanade algorithm.  The Lucas-Kanade algorithm iteratively tries to minimise the difference between the template and a warped image.  The technique can be used for image alignment, tracking, optic flow analysis, and motion estimation.  Possible improvements are to incorporate illumination changes and a proper threatment of the image boundaries.

The example offers five different models

1shift
2shift and scale
3shift and rotation
4affine transform
52-d homography

See also

#!/usr/bin/env ruby
require 'matrix'
require 'hornetseye'
require 'matrix_fix'
include Hornetseye
syntax = <<END_OF_STRING
Align images using phase correlation
Syntax: lktracker.rb <video> <backgr.> <model> <width> <height> <p1> <p2> ...
Examples:
  ./lktracker.rb ../../data/videos/polygon.avi shift 94 65 80 35
  ./lktracker.rb ../../data/videos/polygon.avi shift-n-scale 94 65 80 35 1 1
  ./lktracker.rb ../../data/videos/polygon.avi isometry 94 65 80 35 0
  ./lktracker.rb ../../data/videos/polygon.avi rotate-n-scale 94 65 80 35 0 1
  ./lktracker.rb ../../data/videos/polygon.avi affine 94 65 80 35 1 0 0 1
  ./lktracker.rb ../../data/videos/polygon.avi homography 94 65 1 0 0 1 80 35 0 0
END_OF_STRING
if ARGV.size < 2
  puts syntax
  raise "Wrong number of command-line arguments"
end
class MultiArray
  class << self
    def ramp1( *shape )
      int( *shape ).indgen! % shape[0]
    end
    def ramp2( *shape )
      int( *shape ).indgen! / shape[0]
    end
  end
end
class Sequence_
  def warp_clipped_interpolate( field )
    shape = field.shape[ 1 .. -1 ]
    field00 = field.floor.to_int
    field01 = field00.dup
    field01.roll[ 0 ] += 1
    field10 = field00.dup
    field10.roll[ 1 ] += 1
    field11 = field00.dup
    field11 = field11 + 1
    fieldfrac = field - field00
    frac00 = fieldfrac.roll[ 0 ]
    frac01 = 1 - fieldfrac.roll[ 0 ]
    frac10 = fieldfrac.roll[ 1 ]
    frac11 = 1 - fieldfrac.roll[ 1 ]
    return warp_clipped( field00 ) * frac01 * frac11 +
      warp_clipped( field01 ) * frac00 * frac11 +
      warp_clipped( field10 ) * frac01 * frac10 +
      warp_clipped( field11 ) * frac00 * frac10
  end
end
input = XineInput.new ARGV[0]
if 1.0.inspect == "1,0"
  # Should be 1.0 (not 1,0). Try to fix by reseting locale.
  # You can get locale for Ruby from
  # http://sourceforge.net/projects/ruby-locale/
  require 'locale'
  Locale::setlocale( Locale::LC_ALL, "C" )
end
w = ARGV[2].to_i
h = ARGV[3].to_i
case ARGV[1]
when "shift"
  raise "Shifting model requires 2 parameters" if ARGV.size != 6
  p = Vector[ *ARGV[4...6].collect { |a| a.to_f } ]
  def model( p, x, y )
    Vector[ x + p[0], y + p[1] ]
  end
  def derivative( x, y ) # derivative at p = [ 0, 0 ]
    Matrix[ [ 1, 0 ], [ 0, 1 ] ]
  end
  def compose( p, d )
    p + d
  end
when "shift-n-scale"
  raise "Shift-and-scale model requires 4 parameters" if ARGV.size != 8
  p = Vector[ *ARGV[4...8].collect { |a| a.to_f } ]
  def model( p, x, y )
    Vector[ x * p[2] + p[0], y * p[3] + p[1] ]
  end
  def derivative( x, y ) # derivative at p = [ 0, 0, 1, 1 ]
    Matrix[ [ 1, 0 ], [ 0, 1 ], [ x, 0 ], [ 0, y ] ]
  end
  def compose( p, d )
    p + Vector[ p[2] * d[0], p[3] * d[1], p[2] * d[2], p[3] * d[3] ]
  end
when "isometry"
  raise "Isometric model requires 3 parameters" if ARGV.size != 7
  p = Vector[ *ARGV[4...7].collect { |a| a.to_f } ]
  def model( p, x, y )
    cw, sw = Math::cos( p[2] ), Math::sin( p[2] )
    Vector[ x * cw - y * sw + p[0], x * sw + y * cw + p[1] ]
  end
  def derivative( x, y ) # derivative at p = [ 0, 0, 0 ]
    Matrix[ [ 1, 0 ], [ 0, 1 ], [ -y, x ] ]
  end
  def compose( p, d )
    cw, sw = Math::cos( p[2] ), Math::sin( p[2] )
    p + Matrix[ [  cw, -sw, 0 ],
                [  sw,  cw, 0 ],
                [   0,   0, 1 ] ] * d
  end
when "rotate-n-scale"
  raise "Isometric model requires 3 parameters" if ARGV.size != 8
  p = Vector[ *ARGV[4...8].collect { |a| a.to_f } ]
  def model( p, x, y )
    cw, sw, s = Math::cos( p[2] ), Math::sin( p[2] ), p[3]
    Vector[ x * cw * s - y * sw * s + p[0], x * sw * s + y * cw * s + p[1] ]
  end
  def derivative( x, y ) # derivative at p = [ 0, 0, 0 ]
    Matrix[ [ 1, 0 ], [ 0, 1 ], [ -y, x ], [ x, y ] ]
  end
  def compose( p, d )
    cw, sw, s = Math::cos( p[2] ), Math::sin( p[2] ), p[3]
    p + Matrix[ [  cw, -sw, 0, 0 ],
                [  sw,  cw, 0, 0 ],
                [   0,   0, 1, 0 ],
                [   0,   0, 0, 1 ] ] * d
  end
when "affine"
  raise "Affine model requires 6 parameters" if ARGV.size != 10
  p = Vector[ *ARGV[4...10].collect { |a| a.to_f } ]
  def model( p, x, y )
    Vector[ x * p[2] + y * p[4] + p[0], x * p[3] + y * p[5] + p[1] ]
  end
  def derivative( x, y ) # derivative at p = [ 0, 0, 1, 0, 0, 1 ]
    Matrix[ [ 1, 0 ], [ 0, 1 ], [ x, 0 ], [ 0, x ], [ y, 0 ], [ 0, y ] ]
  end
  def compose( p, d )
    p + Matrix[ [ p[2], p[4],    0,    0,    0,    0 ],
                [ p[3], p[5],    0,    0,    0,    0 ],
                [    0,    0, p[2], p[4],    0,    0 ],
                [    0,    0, p[3], p[5],    0,    0 ],
                [    0,    0,    0,    0, p[2], p[4] ],
                [    0,    0,    0,    0, p[3], p[5] ] ] * d
  end
when "homography"
  raise "Homography requires 8 parameters" if ARGV.size != 12
  p = Vector[ *ARGV[4...12].collect { |a| a.to_f } ]
  def model( p, x, y )
    h = Matrix[ [ p[0], p[2], p[4] ],
                [ p[1], p[3], p[5] ],
                [ p[6], p[7],    1 ] ]
    rh = h * Vector[ x, y, 1 ]
    Vector[ rh[0] / rh[2], rh[1] / rh[2] ]
  end
  def derivative( x, y )
    Matrix[
      [ x, 0 ], [ 0, x ], [ y, 0 ], [ 0, y ], [ 1, 0 ], [ 0, 1 ],
      [ -x * x, -x * y ], [ -y * x, -y * y ] ]
  end
  def compose( p, d )
    h = Matrix[ [ p[0], p[2], p[4] ],
                [ p[1], p[3], p[5] ],
                [ p[6], p[7],    1 ] ]
    hd = Matrix[ [ 1 + d[0],     d[2], d[4] ],
                 [     d[1], 1 + d[3], d[5] ],
                 [     d[6],     d[7],    1 ] ]
    hr = h * hd
    hr = hr / hr[2,2]
    Vector[ hr[0,0], hr[1,0], hr[0,1], hr[1,1],
      hr[0,2], hr[1,2], hr[2,0], hr[2,1] ]
  end
else
  raise "No such model (#{ARGV[1]})"
end
class PWarp
  def initialize( x, y, w, h )
    @w, @h, @x, @y = w, h, x, y
    @field = MultiArray.sfloat( 2, @w, @h )
  end
  def warp( img, p )
    @field[ 0, 0...@w, 0...@h ], @field[ 1, 0...@w, 0...@h ] =
      *model( p, @x, @y )
    img.warp_clipped_interpolate @field
  end
end
img = input.read_ubyte
sigma = 2.5
# compute gradient on a larger field and crop later to avoid fringe effects.
b = ( Array.gauss_gradient_filter( sigma ).size - 1 ) / 2
x = MultiArray.ramp1( w + 2 * b, h + 2 * b ) - b
y = MultiArray.ramp2( w + 2 * b, h + 2 * b ) - b
tpl = PWarp.new( x, y, w + 2 * b, h + 2 * b ).warp( img, p )
gx = tpl.gauss_gradient( sigma, 0 )
gy = tpl.gauss_gradient( sigma, 1 )
tpl, gx, gy, x, y = *( [ tpl, gx, gy, x, y ].
                       collect { |arr| arr[ b...(w+b), b...(h+b) ] } )
c = derivative( x, y ) * Vector[ gx, gy ]
hs = ( c * c.covector ).collect { |e| e.sum }
hsinv = hs.inverse
display = X11Display.new
# output = OpenGLOutput.new
# output = XVideoOutput.new
output = XImageOutput.new
window = X11Window.new( display, output, img.shape[0], img.shape[1] )
window.title = "Lucas-Kanade tracker"
window.show
while input.status? and output.status?
  img = input.read_ubyte
  for i in 0...5
    diff = PWarp.new( x, y, w, h ).warp( img, p ) - tpl
    s = c.collect { |e| ( e * diff ).sum }
    p = compose( p, hsinv * s )
  end
  gc = Magick::Draw.new
  gc.fill_opacity( 0 )
  gc.stroke( 'red' ).stroke_width( 1 )
  gc.line( *( model( p, 0, 0 ).to_a + model( p, w, 0 ).to_a ) )
  gc.line( *( model( p, 0, h ).to_a + model( p, w, h ).to_a ) )
  gc.line( *( model( p, 0, 0 ).to_a + model( p, 0, h ).to_a ) )
  gc.line( *( model( p, w, 0 ).to_a + model( p, w, h ).to_a ) )
  gc.circle( *( model( p, 0, 0 ).to_a + model( p, 3, 0 ).to_a ) )
  result = img.to_ubytergb.to_magick
  gc.draw( result )
  result = result.to_multiarray
  output.write result
  display.processEvents
end
Close