diff --git a/foosball/detectors/__init__.py b/foosball/detectors/__init__.py new file mode 100644 index 0000000..37e8e26 --- /dev/null +++ b/foosball/detectors/__init__.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod +from typing import TypeVar, Generic + +from foosball.models import Frame + +DetectorResult = TypeVar('DetectorResult') + + +class Detector(ABC, Generic[DetectorResult]): + def __init__(self, *args, **kwargs): + pass + + @abstractmethod + def detect(self, frame: Frame) -> DetectorResult | None: + pass diff --git a/foosball/detectors/color.py b/foosball/detectors/color.py new file mode 100644 index 0000000..a5562ff --- /dev/null +++ b/foosball/detectors/color.py @@ -0,0 +1,233 @@ +import logging +import os +from abc import ABC +from dataclasses import dataclass +from typing import TypeVar, Generic + +import cv2 +import imutils +import numpy as np +import yaml + +from . import Detector, DetectorResult +from ..models import Frame, DetectedGoals, Point, Goal, Blob, Goals, DetectedBall, HSV + +logger = logging.getLogger(__name__) + +DetectorConfig = TypeVar('DetectorConfig') + + +@dataclass +class BallConfig: + bounds: [HSV, HSV] + invert_frame: bool = False + invert_mask: bool = False + + def store(self): + filename = f"ball.yaml" + print(f"Store config {filename}" + (" " * 50), end="\n\n") + with open(filename, "w") as f: + yaml.dump(self.to_dict(), f) + + @staticmethod + def load(filename='ball.yaml'): + if os.path.isfile(filename): + logging.info("Loading ball config ball.yaml") + with open(filename, 'r') as f: + c = yaml.safe_load(f) + return BallConfig(invert_frame=c['invert_frame'], invert_mask=c['invert_mask'], + bounds=np.array(c['bounds'])) + else: + logging.info("No ball config found") + return None + + def __eq__(self, other): + """Overrides the default implementation""" + if isinstance(other, BallConfig): + return (all([a == b for a, b in zip(self.bounds[0], other.bounds[0])]) and + all([a == b for a, b in zip(self.bounds[1], other.bounds[1])]) and + self.invert_mask == other.invert_mask and + self.invert_frame == other.invert_frame) + return False + + def to_dict(self): + return { + "bounds": [x.tolist() for x in self.bounds], + "invert_frame": self.invert_frame, + "invert_mask": self.invert_mask + } + + + +@dataclass +class GoalConfig: + bounds: [int, int] + invert_frame: bool = True + invert_mask: bool = True + + def store(self): + filename = f"goal.yaml" + print(f"Store config {filename}" + (" " * 50), end="\n\n") + with open(filename, "w") as f: + yaml.dump(self.to_dict(), f) + + @staticmethod + def load(filename='goal.yaml'): + with open(filename, 'r') as f: + c = yaml.safe_load(f) + return GoalConfig(**c) + + def __eq__(self, other): + """Overrides the default implementation""" + if isinstance(other, GoalConfig): + return (all([a == b for a, b in zip(self.bounds, other.bounds)]) and + self.invert_mask == other.invert_mask and + self.invert_frame == other.invert_frame) + return False + + def to_dict(self): + return { + "bounds": self.bounds, + "invert_frame": self.invert_frame, + "invert_mask": self.invert_mask + } + + +class ColorDetector(Generic[DetectorConfig, DetectorResult], Detector[DetectorResult], ABC): + def __init__(self, config: DetectorConfig, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = config + + +class BallDetector(ColorDetector[BallConfig, DetectedBall]): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def detect(self, frame) -> DetectedBall: + if self.config: + detection_frame = filter_color_range(frame, self.config) + detected_blob = detect_largest_blob(detection_frame) + return DetectedBall(ball=detected_blob, frame=detection_frame) + else: + logger.error("Ball Detection not possible. Config is None") + + +class GoalDetector(ColorDetector[GoalConfig, DetectedGoals]): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @staticmethod + def detect_goal_blobs(img) -> list[Goal] | None: + """ + We take the largest blobs that lay the most to the right and to the left, + assuming that those are our goals + """ + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + cnts = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + cnts = imutils.grab_contours(cnts) + + # only proceed if at least one contour was found + if len(cnts) > 0: + # find the largest contour in the mask, then use + # it to compute the minimum enclosing circle and + # centroid + largest_contours = sorted(cnts, key=cv2.contourArea, reverse=True)[:2] + if len(largest_contours) != 2: + logger.error("Could not detect 2 goals") + return None + centers_and_bboxes = [transform_contour(cnt) for cnt in largest_contours] + # sort key = x coordinate of the center of mass + blobs_ordered_by_x = [Goal(center=x[0], bbox=x[1]) for x in + sorted(centers_and_bboxes, key=lambda center_bbox: center_bbox[0][0])] + return blobs_ordered_by_x + return None + + def detect(self, frame: Frame) -> DetectedGoals: + if self.config is not None: + detection_frame = filter_gray_range(frame, self.config) + detected_blobs = self.detect_goal_blobs(detection_frame) + if detected_blobs is not None: + return DetectedGoals(goals=Goals(left=detected_blobs[0], right=detected_blobs[1]), + frame=detection_frame) + else: + return DetectedGoals(goals=None, frame=detection_frame) + else: + logger.error("Goal Detection not possible. config is None") + + +def filter_color_range(frame, config: BallConfig) -> Frame: + [lower, upper] = config.bounds + f = frame if not config.invert_frame else cv2.bitwise_not(frame) + + blurred = cv2.GaussianBlur(f, (1, 1), 0) + hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV) + # construct a mask for the chosen color, then perform + # a series of dilations and erosions to remove any small + # blobs left in the simple frame + simple_mask = cv2.inRange(hsv, lower, upper) + # kernel=None <=> 3x3 + simple_mask = cv2.erode(simple_mask, None, iterations=2) + simple_mask = cv2.dilate(simple_mask, None, iterations=2) + + simple_mask = simple_mask if not config.invert_mask else cv2.bitwise_not(simple_mask) + # if verbose: + # self.display.show("dilate", simple, 'bl') + + # ## for masking + # cleaned = mask_img(simple, mask=bar_mask) + # contrast = mask_img(frame, cleaned) + # show("contrast", contrast, 'br') + # show("frame", mask_img(frame, bar_mask), 'tl') + return cv2.bitwise_and(f, f, mask=simple_mask) + + +def filter_gray_range(frame, config: GoalConfig) -> Frame: + try: + [lower, upper] = config.bounds + f = frame if not config.invert_frame else cv2.bitwise_not(frame) + + gray = cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) + + # Create a binary mask using cv2.inRange + mask = cv2.inRange(gray, lower, upper) + + # Apply morphological operations for noise reduction and region connection + kernel = np.ones((3, 3), np.uint8) + dilated_mask = cv2.dilate(mask, kernel, iterations=2) + eroded_mask = cv2.erode(dilated_mask, kernel, iterations=6) + final_mask = eroded_mask if not config.invert_mask else cv2.bitwise_not(eroded_mask) + x = cv2.bitwise_and(f, f, mask=final_mask) + return cv2.dilate(x, kernel, iterations=2) + except Exception as e: + logger.error(f"Exception: {e}\n\n") + return frame + + +def detect_largest_blob(img) -> Blob | None: + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + cnts = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + cnts = imutils.grab_contours(cnts) + + # only proceed if at least one contour was found + if len(cnts) > 0: + # find the largest contour in the mask, then use + # it to compute the minimum enclosing circle and + # centroid + largest_contour = max(cnts, key=cv2.contourArea) + # ((x, y), radius) = cv2.minEnclosingCircle(c) + center, [x, y, w, h] = transform_contour(largest_contour) + + return Blob(center=center, bbox=[x, y, w, h]) + return None + + +def transform_contour(contour) -> (Point, [int, int, int, int]): + """ + calculate bounding box and center of mass + """ + [x, y, w, h] = cv2.boundingRect(contour) + ms = cv2.moments(contour) + center = (int(ms["m10"] / ms["m00"]), int(ms["m01"] / ms["m00"])) + return center, [x, y, w, h] diff --git a/foosball/models.py b/foosball/models.py index 9911758..0d846b3 100644 --- a/foosball/models.py +++ b/foosball/models.py @@ -61,6 +61,7 @@ def inc(self, team: Team): def to_string(self): return f"{self.blue} : {self.red}" + @dataclass class FrameDimensions: original: [int, int] @@ -79,7 +80,7 @@ def area(self): @dataclass -class BallDetectionResult: +class DetectedBall: ball: Blob frame: np.array @@ -94,86 +95,11 @@ class Goals: @dataclass -class GoalsDetectionResult: +class DetectedGoals: goals: Optional[Goals] frame: np.array -@dataclass -class BallConfig: - bounds: [HSV, HSV] - invert_frame: bool = False - invert_mask: bool = False - - def store(self): - filename = f"ball.yaml" - print(f"Store config {filename}" + (" " * 50), end="\n\n") - with open(filename, "w") as f: - yaml.dump(self.to_dict(), f) - - @staticmethod - def load(filename='ball.yaml'): - if os.path.isfile(filename): - logging.info("Loading ball config ball.yaml") - with open(filename, 'r') as f: - c = yaml.safe_load(f) - return BallConfig(invert_frame=c['invert_frame'], invert_mask=c['invert_mask'], - bounds=np.array(c['bounds'])) - else: - logging.info("No ball config found") - return None - - def __eq__(self, other): - """Overrides the default implementation""" - if isinstance(other, BallConfig): - return (all([a == b for a, b in zip(self.bounds[0], other.bounds[0])]) and - all([a == b for a, b in zip(self.bounds[1], other.bounds[1])]) and - self.invert_mask == other.invert_mask and - self.invert_frame == other.invert_frame) - return False - - def to_dict(self): - return { - "bounds": [x.tolist() for x in self.bounds], - "invert_frame": self.invert_frame, - "invert_mask": self.invert_mask - } - - -@dataclass -class GoalConfig: - bounds: [int, int] - invert_frame: bool = True - invert_mask: bool = True - - def store(self): - filename = f"goal.yaml" - print(f"Store config {filename}" + (" " * 50), end="\n\n") - with open(filename, "w") as f: - yaml.dump(self.to_dict(), f) - - @staticmethod - def load(filename='goal.yaml'): - with open(filename, 'r') as f: - c = yaml.safe_load(f) - return GoalConfig(**c) - - def __eq__(self, other): - """Overrides the default implementation""" - if isinstance(other, GoalConfig): - return (all([a == b for a, b in zip(self.bounds, other.bounds)]) and - self.invert_mask == other.invert_mask and - self.invert_frame == other.invert_frame) - return False - - def to_dict(self): - return { - "bounds": self.bounds, - "invert_frame": self.invert_frame, - "invert_mask": self.invert_mask - } - - Track = collections.deque diff --git a/foosball/sink/opencv.py b/foosball/sink/opencv.py index cfacde4..87e8d23 100644 --- a/foosball/sink/opencv.py +++ b/foosball/sink/opencv.py @@ -6,8 +6,7 @@ import yaml from . import Sink -from foosball.tracking import BallConfig -from foosball.models import GoalConfig +from ..detectors.color import BallConfig, GoalConfig GOAL = "goal" BALL = "ball" diff --git a/foosball/tracking/__init__.py b/foosball/tracking/__init__.py index 5cbb49f..56621c4 100644 --- a/foosball/tracking/__init__.py +++ b/foosball/tracking/__init__.py @@ -10,7 +10,8 @@ from .preprocess import PreProcessor from .render import Renderer from .tracker import Tracker -from ..models import Mask, FrameDimensions, BallConfig, rgb2hsv, GoalConfig +from ..detectors.color import BallConfig, GoalConfig +from ..models import Mask, FrameDimensions, rgb2hsv from ..pipe.Pipe import Pipe diff --git a/foosball/tracking/colordetection.py b/foosball/tracking/colordetection.py deleted file mode 100644 index b3412c3..0000000 --- a/foosball/tracking/colordetection.py +++ /dev/null @@ -1,131 +0,0 @@ -import logging - -import cv2 -import imutils -import numpy as np - -from ..models import BallDetectionResult, Frame, Blob, BallConfig, GoalsDetectionResult, Goals, Point, GoalConfig, Goal -logger = logging.getLogger(__name__) - - -def detect_ball(frame, bounds: BallConfig) -> BallDetectionResult: - if bounds is not None: - detection_frame = filter_color_range(frame, bounds) - detected_blob = detect_largest_blob(detection_frame) - return BallDetectionResult(ball=detected_blob, frame=detection_frame) - else: - logger.error("Ball Detection not possible. Config is None") - - -def detect_goals(frame, config: GoalConfig) -> GoalsDetectionResult: - if config is not None: - detection_frame = filter_gray_range(frame, config) - detected_blobs = detect_goal_blobs(detection_frame) - if detected_blobs is not None: - return GoalsDetectionResult(goals=Goals(left=detected_blobs[0], right=detected_blobs[1]), frame=detection_frame) - else: - return GoalsDetectionResult(goals=None, frame=detection_frame) - else: - logger.error("Goal Detection not possible. config is None") - - -def filter_color_range(frame, bounds: BallConfig) -> Frame: - [lower, upper] = bounds.bounds - f = frame if not bounds.invert_frame else cv2.bitwise_not(frame) - - blurred = cv2.GaussianBlur(f, (1, 1), 0) - hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV) - # construct a mask for the chosen color, then perform - # a series of dilations and erosions to remove any small - # blobs left in the simple frame - simple_mask = cv2.inRange(hsv, lower, upper) - simple_mask = cv2.erode(simple_mask, None, iterations=2) - simple_mask = cv2.dilate(simple_mask, None, iterations=2) - - simple_mask = simple_mask if not bounds.invert_mask else cv2.bitwise_not(simple_mask) - # if verbose: - # self.display.show("dilate", simple, 'bl') - - # ## for masking - # cleaned = mask_img(simple, mask=bar_mask) - # contrast = mask_img(frame, cleaned) - # show("contrast", contrast, 'br') - # show("frame", mask_img(frame, bar_mask), 'tl') - return cv2.bitwise_and(f, f, mask=simple_mask) - - -def filter_gray_range(frame, config: GoalConfig) -> Frame: - try: - [lower, upper] = config.bounds - f = frame if not config.invert_frame else cv2.bitwise_not(frame) - - gray = cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) - - # Create a binary mask using cv2.inRange - mask = cv2.inRange(gray, lower, upper) - - # Apply morphological operations for noise reduction and region connection - kernel = np.ones((3, 3), np.uint8) - dilated_mask = cv2.dilate(mask, kernel, iterations=2) - eroded_mask = cv2.erode(dilated_mask, kernel, iterations=6) - final_mask = eroded_mask if not config.invert_mask else cv2.bitwise_not(eroded_mask) - x = cv2.bitwise_and(f, f, mask=final_mask) - return cv2.dilate(x, kernel, iterations=2) - except Exception as e: - logger.error(f"Exception: {e}\n\n") - return frame - - -def detect_largest_blob(img) -> Blob | None: - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - cnts = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - cnts = imutils.grab_contours(cnts) - - # only proceed if at least one contour was found - if len(cnts) > 0: - # find the largest contour in the mask, then use - # it to compute the minimum enclosing circle and - # centroid - largest_contour = max(cnts, key=cv2.contourArea) - # ((x, y), radius) = cv2.minEnclosingCircle(c) - center, [x,y,w,h] = transform_contour(largest_contour) - - return Blob(center=center, bbox=[x, y, w, h]) - return None - - -def transform_contour(contour) -> (Point, [int, int, int, int]): - """ - calculate bounding box and center of mass - """ - [x, y, w, h] = cv2.boundingRect(contour) - ms = cv2.moments(contour) - center = (int(ms["m10"] / ms["m00"]), int(ms["m01"] / ms["m00"])) - return center, [x, y, w, h] - - -def detect_goal_blobs(img) -> list[Goal] | None: - """ - We take the largest blobs that lay the most to the right and to the left, - assuming that those are our goals - """ - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - - cnts = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - cnts = imutils.grab_contours(cnts) - - # only proceed if at least one contour was found - if len(cnts) > 0: - # find the largest contour in the mask, then use - # it to compute the minimum enclosing circle and - # centroid - largest_contours = sorted(cnts, key=cv2.contourArea, reverse=True)[:2] - if len(largest_contours) != 2: - logger.error("Could not detect 2 goals") - return None - centers_and_bboxes = [transform_contour(cnt) for cnt in largest_contours] - # sort key = x coordinate of the center of mass - blobs_ordered_by_x = [Goal(center=x[0], bbox=x[1]) for x in sorted(centers_and_bboxes, key=lambda center_bbox: center_bbox[0][0])] - return blobs_ordered_by_x - return None diff --git a/foosball/tracking/preprocess.py b/foosball/tracking/preprocess.py index c64d7df..9eb4911 100644 --- a/foosball/tracking/preprocess.py +++ b/foosball/tracking/preprocess.py @@ -6,10 +6,10 @@ import cv2 import numpy as np -from const import CALIBRATION_MODE, VERBOSE, CalibrationMode, OFF -from .colordetection import detect_goals +from const import CalibrationMode, OFF from ..arUcos import calibration, Aruco -from ..models import Frame, PreprocessResult, Point, Rect, GoalConfig, Blob, Goals, FrameDimensions, ScaleDirection, \ +from ..detectors.color import GoalDetector, GoalConfig +from ..models import Frame, PreprocessResult, Point, Rect, Blob, Goals, FrameDimensions, ScaleDirection, \ InfoLog, Info from ..pipe.BaseProcess import BaseProcess, Msg from ..pipe.Pipe import clear @@ -55,7 +55,6 @@ def __init__(self, dims: FrameDimensions, goal_config: GoalConfig, headless=True self.xpad = xpad self.ypad = ypad [self.proc, self.iproc] = generate_processor_switches(useGPU) - self.goal_config = goal_config self.detector, _ = calibration.init_aruco_detector(aruco_dictionary, aruco_params) self.markers = [] self.homography_matrix = None @@ -66,6 +65,7 @@ def __init__(self, dims: FrameDimensions, goal_config: GoalConfig, headless=True self.goals_calibration = self.calibrationMode == CalibrationMode.GOAL self.calibration_out = Queue() if self.goals_calibration else None self.config_in = Queue() if self.goals_calibration else None + self.goal_detector = GoalDetector(goal_config) def config_input(self, config: GoalConfig) -> None: if self.goals_calibration: @@ -97,7 +97,7 @@ def process(self, msg: Msg) -> Msg: try: if self.goals_calibration: try: - self.goal_config = self.config_in.get_nowait() + self.goal_detector.config = self.config_in.get_nowait() except Empty: pass @@ -121,7 +121,7 @@ def process(self, msg: Msg) -> Msg: preprocessed, self.homography_matrix = self.four_point_transform(frame, self.markers) if trigger_marker_detection: # detect goals anew - goals_detection_result = detect_goals(preprocessed, self.goal_config) + goals_detection_result = self.goal_detector.detect(preprocessed) if self.goals_calibration: self.calibration_out.put_nowait(ensure_cpu(goals_detection_result.frame)) # check if goals are not significantly smaller than before diff --git a/foosball/tracking/tracker.py b/foosball/tracking/tracker.py index cca9625..98ea6c4 100644 --- a/foosball/tracking/tracker.py +++ b/foosball/tracking/tracker.py @@ -4,9 +4,9 @@ from queue import Empty from const import CalibrationMode -from .colordetection import detect_ball from .preprocess import WarpMode, project_blob -from ..models import TrackResult, Track, BallConfig, Info, Blob, Goals, InfoLog +from ..detectors.color import BallDetector, BallConfig +from ..models import TrackResult, Track, Info, Blob, Goals, InfoLog from ..pipe.BaseProcess import BaseProcess, Msg from ..pipe.Pipe import clear from ..utils import generate_processor_switches @@ -19,7 +19,7 @@ def log(result: TrackResult) -> None: class Tracker(BaseProcess): - def __init__(self, ball_bounds: BallConfig, useGPU: bool = False, buffer=16, off=False, verbose=False, + def __init__(self, ball_config: BallConfig, useGPU: bool = False, buffer=16, off=False, verbose=False, calibrationMode=None, **kwargs): super().__init__(name="Tracker") self.ball_track = Track(maxlen=buffer) @@ -29,9 +29,8 @@ def __init__(self, ball_bounds: BallConfig, useGPU: bool = False, buffer=16, off [self.proc, self.iproc] = generate_processor_switches(useGPU) # define the lower_ball and upper_ball boundaries of the # ball in the HSV color space, then initialize the - self.ball_bounds = ball_bounds self.ball_calibration = self.calibrationMode == CalibrationMode.BALL - self.calibration_bounds = lambda: self.ball_bounds if self.ball_calibration else None + self.ball_detector = BallDetector(ball_config) self.bounds_in = Queue() if self.ball_calibration else None self.calibration_out = Queue() if self.ball_calibration else None @@ -58,11 +57,11 @@ def get_info(self, ball_track: Track) -> InfoLog: Info(verbosity=0, title="Tracker", value=f"{'off' if self.off else 'on'}") ]) if self.ball_calibration: - [lower, upper] = self.calibration_bounds().bounds + [lower, upper] = self.ball_detector.config.bounds info.append(Info(verbosity=0, title=f"lower", value=f'({",".join(map(str,lower))})')) info.append(Info(verbosity=0, title=f"upper", value=f'({",".join(map(str,upper))})')) - info.append(Info(verbosity=0, title=f"invert frame", value=f'{self.calibration_bounds().invert_frame}')) - info.append(Info(verbosity=0, title=f"invert mask", value=f'{self.calibration_bounds().invert_mask}')) + info.append(Info(verbosity=0, title=f"invert frame", value=f'{self.ball_detector.config.invert_frame}')) + info.append(Info(verbosity=0, title=f"invert mask", value=f'{self.ball_detector.config.invert_mask}')) return info @property @@ -83,11 +82,11 @@ def process(self, msg: Msg) -> Msg: if not self.off: if self.ball_calibration: try: - self.ball_bounds = self.bounds_in.get_nowait() + self.ball_detector.config = self.bounds_in.get_nowait() except Empty: pass f = self.proc(preprocess_result.preprocessed if preprocess_result.preprocessed is not None else preprocess_result.original) - ball_detection_result = detect_ball(f, self.ball_bounds) + ball_detection_result = self.ball_detector.detect(f) ball = ball_detection_result.ball # do not forget to project detected points onto the original frame on rendering if not self.verbose: