Skip to content

Commit

Permalink
Merge pull request #1 from illini-robomaster/roger/basic_tracker
Browse files Browse the repository at this point in the history
Vision software v0.0.1 Release
  • Loading branch information
jialinh4 authored Feb 27, 2023
2 parents 2446f36 + f4c2be1 commit 40d38fe
Show file tree
Hide file tree
Showing 14 changed files with 473 additions and 112 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
51 changes: 31 additions & 20 deletions Aiming/Aim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@
import config
import Utils

from .Tracking import basic_tracker

class Aim:
def __init__(self):
pass
def __init__(self, config):
self.CFG = config
self.tracker = basic_tracker(self.CFG)

def process_one(self, pred_list, enemy_team, depth_map):
def process_one(self, pred_list, enemy_team, rgb_img):
assert enemy_team in ['blue', 'red']
closet_pred, closet_dist = self.get_closet_pred(pred_list, enemy_team, depth_map)

# TODO: use assertion to check enemy_team

final_bbox_list, final_id_list = self.tracker.process_one(pred_list, enemy_team, rgb_img)

# TODO: integrate this into tracking for consistent tracking
closet_pred, closet_dist = self.get_closet_pred(final_bbox_list, rgb_img)

if closet_pred is None:
return None
name, confidence, bbox = closet_pred
center_x, center_y, width, height = bbox
center_x, center_y, width, height = closet_pred

# Get yaw/pitch differences in radians
yaw_diff, pitch_diff = self.get_rotation_angle(center_x, center_y)
Expand All @@ -24,6 +33,8 @@ def process_one(self, pred_list, enemy_team, depth_map):
'pitch_diff': calibrated_pitch_diff,
'center_x': center_x,
'center_y': center_y,
'final_bbox_list': final_bbox_list,
'final_id_list': final_id_list,
}

def posterior_calibration(self, yaw_diff, pitch_diff, distance):
Expand All @@ -46,30 +57,30 @@ def posterior_calibration(self, yaw_diff, pitch_diff, distance):
# TODO: compute a range table
return (yaw_diff, pitch_diff)

def get_closet_pred(self, pred_list, enemy_team, depth_map):
def get_closet_pred(self, bbox_list, rgb_img):
'''Get the closet prediction to camera focal point'''
# TODO: instead of camera focal point; calibrate closet pred to operator view
H, W, C = rgb_img.shape
focal_y = H / 2
focal_x = W / 2
closet_pred = None
closet_dist = None # Cloest to camera in z-axis
obj_of_interest = [f"armor_{enemy_team}"]
for name, conf, bbox in pred_list:
# name from C++ string is in bytes; decoding is needed
if isinstance(name, bytes):
name_str = name.decode('utf-8')
else:
name_str = name
if name_str not in obj_of_interest: continue
cur_dist = Utils.estimate_target_depth(bbox, depth_map)
closet_dist = 99999999
for bbox in bbox_list:
center_x, center_y, width, height = bbox
cur_dist = (center_x - focal_x)**2 + (center_y - focal_y)**2
if closet_pred is None:
closet_pred = (name, conf, bbox)
closet_pred = bbox
closet_dist = cur_dist
else:
if cur_dist < closet_dist:
closet_pred = (name, conf, bbox)
closet_pred = bbox
closet_dist = cur_dist
return closet_pred, closet_dist

@staticmethod
def get_rotation_angle(bbox_center_x, bbox_center_y):
yaw_diff = (bbox_center_x - config.IMG_CENTER_X) * (config.RGBD_CAMERA.YAW_FOV_HALF / config.IMG_CENTER_X)
pitch_diff = (bbox_center_y - config.IMG_CENTER_Y) * (config.RGBD_CAMERA.PITCH_FOV_HALF / config.IMG_CENTER_Y)
yaw_diff = (bbox_center_x - config.IMG_CENTER_X) * (config.AUTOAIM_CAMERA.YAW_FOV_HALF / config.IMG_CENTER_X)
pitch_diff = (bbox_center_y - config.IMG_CENTER_Y) * (config.AUTOAIM_CAMERA.PITCH_FOV_HALF / config.IMG_CENTER_Y)

return yaw_diff, pitch_diff
2 changes: 1 addition & 1 deletion Aiming/Tracking/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
#placeholder
from .basic_tracker import basic_tracker
112 changes: 112 additions & 0 deletions Aiming/Tracking/basic_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import os
import scipy.optimize
import numpy as np

from .consistent_id_gen import ConsistentIdGenerator

from IPython import embed

# TODO: move this to config
FRAME_BUFFER_SIZE = 10

class tracked_armor(object):
def __init__(self, bbox, roi, frame_tick, armor_id):
self.bbox_buffer = [bbox]
self.roi_buffer = [roi]
self.observed_frame_tick = [frame_tick]
self.armor_id = armor_id # unique ID

def compute_cost(self, other_armor):
assert isinstance(other_armor, tracked_armor)
# TODO: use more sophisticated metrics (e.g., RGB) as cost function
c_x, c_y, w, h = self.bbox_buffer[-1]
o_c_x, o_c_y, o_w, o_h = other_armor.bbox_buffer[-1]
return np.square(c_x - o_c_x) + np.square(c_y - o_c_y)

def update(self, other_armor, frame_tick):
# Only call if these two armors are matched
self.bbox_buffer.append(other_armor.bbox_buffer[-1])
self.roi_buffer.append(other_armor.roi_buffer[-1])
self.observed_frame_tick.append(frame_tick)

# Maintain each armor's buffer so that anything older than FRAME_BUFFER_SIZE is dropped
self.bbox_buffer = self.bbox_buffer[-FRAME_BUFFER_SIZE:]
self.roi_buffer = self.roi_buffer[-FRAME_BUFFER_SIZE:]

def predict_bbox(self, cur_frame_tick):
if cur_frame_tick == self.observed_frame_tick[-1] or len(self.bbox_buffer) == 1:
return self.bbox_buffer[-1]
else:
# Linear extrapolation
c_x, c_y, w, h = self.bbox_buffer[-1]
o_c_x, o_c_y, o_w, o_h = self.bbox_buffer[-2]
delta_tick = self.observed_frame_tick[-1] - self.observed_frame_tick[-2]
new_delta_tick = cur_frame_tick - self.observed_frame_tick[-1]
delta_x = (c_x - o_c_x) * new_delta_tick / delta_tick
delta_y = (c_y - o_c_y) * new_delta_tick / delta_tick
return (int(c_x + delta_x), int(c_y + delta_y), w, h)

class basic_tracker(object):
'''
Basic tracker that can handle only one target.
It memorizes the state of last two predictions and do linear extrapolation
'''
SE_THRESHOLD = 3200 # (40, 40) pixels away

def __init__(self, config):
self.CFG = config
self.active_armors = []
self.id_gen = ConsistentIdGenerator()
self.frame_tick = 0 # TODO: use timestamp may be a better idea

def process_one(self, pred_list, enemy_team, rgb_img):
new_armors = []
for name, conf, bbox in pred_list:
c_x, c_y, w, h = bbox
lower_x = int(c_x - w / 2)
upper_x = int(c_x + w / 2)
lower_y = int(c_y - h / 2)
upper_y = int(c_y + h / 2)
roi = rgb_img[lower_y:upper_y,lower_x:upper_x]
new_armors.append(tracked_armor(bbox, roi, self.frame_tick, -1))

if len(self.active_armors) > 0:
# Try to associate with current armors
cost_matrix = np.zeros((len(new_armors), len(self.active_armors)), dtype=float)
for i in range(len(new_armors)):
for j in range(len(self.active_armors)):
cost_ij = new_armors[i].compute_cost(self.active_armors[j])
cost_matrix[i, j] = cost_ij
row_ind, col_ind = scipy.optimize.linear_sum_assignment(cost_matrix)

for i, j in zip(row_ind, col_ind):
if cost_matrix[i, j] < self.SE_THRESHOLD:
assert new_armors[i] is not None
self.active_armors[j].update(new_armors[i], self.frame_tick)
new_armors[i] = None

new_armors = [i for i in new_armors if i is not None]

for n in new_armors:
n.armor_id = self.id_gen.get_id()

# Maintain current buffer. If an armor has not been observed by 10 frames, it is dropped
self.active_armors = [i for i in self.active_armors
if self.frame_tick - i.observed_frame_tick[-1] < FRAME_BUFFER_SIZE]

# Unassociated armors are registered as new armors
for a in new_armors:
self.active_armors.append(a)

# Create a list of bbox and unique IDs to return
ret_bbox_list = []
ret_id_list = []
for a in self.active_armors:
# If an armor is observed, directly use the bbox
ret_bbox_list.append(a.predict_bbox(self.frame_tick))
ret_id_list.append(a.armor_id)

self.frame_tick += 1

return ret_bbox_list, ret_id_list
17 changes: 17 additions & 0 deletions Aiming/Tracking/consistent_id_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import threading

class ConsistentIdGenerator(object):
def __init__(self, lock=False):
self.lock_flag = lock
if self.lock_flag:
self._lock = threading.Lock()
self._id = 0

def get_id(self):
if self.lock_flag:
with self._lock:
self._id += 1
return self._id
else:
self._id += 1
return self._id
File renamed without changes.
53 changes: 53 additions & 0 deletions Camera/read_from_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Read from file
import sys
import time
import numpy as np
import cv2
import Utils

# TODO: make an abstract camera base class
class fake_camera(object):
# Needs to be calibrated per camera
YAW_FOV_HALF = Utils.deg_to_rad(42) / 2
PITCH_FOV_HALF = Utils.deg_to_rad(42) / 2

def __init__(self, width, height):
self.width = width
self.height = height

assert len(sys.argv) == 2 # main py file; video file
video_path = sys.argv[1]
assert video_path.endswith('.mp4')

self.cap = cv2.VideoCapture(video_path)
assert self.cap.isOpened()

self.alive = True # init to True

# Timing and frame counter are always in place for devlopment purpose
self.timing = None
self.frame_cnt = 0

def get_frame(self):
if self.timing is None:
self.timing = time.time()

if not self.cap.isOpened():
self.alive = False

ret, frame = self.cap.read()

if not ret:
self.alive = False

if not self.alive:
print("Total frames: {}".format(self.frame_cnt))
print("Total time: {}".format(time.time() - self.timing))
print("FPS: {}".format(self.frame_cnt / (time.time() - self.timing)))
raise Exception("Video file exhausted")

frame = cv2.resize(frame, (self.width, self.height))

self.frame_cnt += 1

return frame
33 changes: 33 additions & 0 deletions Camera/shl_200ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import numpy as np
import cv2
import time

cap = cv2.VideoCapture('v4l2src device=/dev/video0 ! image/jpeg,framerate=61612/513,width=640,height=480 ! jpegdec ! videoconvert ! video/x-raw,format=BGR ! appsink max-buffers=1 drop=true', cv2.CAP_GSTREAMER)

if not cap.isOpened():
print("Cannot open camera")
exit()

while True:
start_cp = time.time()
# Capture frame-by-frame
ret, frame = cap.read()
# if frame is read correctly ret is True
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
break
print(frame.shape)
# Our operations on the frame come here
# Display the resulting frame
cv2.imshow('frame', frame)
my_key = cv2.waitKey(1)
if my_key == ord('q'):
break
elif my_key == ord('s'):
cv2.imwrite('test.jpg', frame)
print("Total time elapsed: {:.4f}".format(time.time() - start_cp))

# When everything done, release the capture

cap.release()
cv2.destroyAllWindows()
4 changes: 2 additions & 2 deletions Communication/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def create_packet(header, seq_num, yaw_offset, pitch_offset):
assert seq_num >= 0 and seq_num < 2**32 - 1 # uint32
packet += seq_num.to_bytes(4, 'big')
# YAW/PITCH offset should not be too high
assert yaw_offset >= -config.RGBD_CAMERA.YAW_FOV_HALF and yaw_offset <= config.RGBD_CAMERA.YAW_FOV_HALF
assert pitch_offset >= -config.RGBD_CAMERA.PITCH_FOV_HALF and pitch_offset <= config.RGBD_CAMERA.PITCH_FOV_HALF
assert yaw_offset >= -config.AUTOAIM_CAMERA.YAW_FOV_HALF and yaw_offset <= config.AUTOAIM_CAMERA.YAW_FOV_HALF
assert pitch_offset >= -config.AUTOAIM_CAMERA.PITCH_FOV_HALF and pitch_offset <= config.AUTOAIM_CAMERA.PITCH_FOV_HALF
discrete_yaw_offset = int(yaw_offset * 100000)
discrete_pitch_offset = int(pitch_offset * 100000)
packet += (discrete_yaw_offset & 0xFFFFFFFF).to_bytes(4, 'big')
Expand Down
Loading

0 comments on commit 40d38fe

Please sign in to comment.