Source code for n_fold_edge.marker_tracker

"""Track markers in video."""

import csv
import math
import signal
from collections.abc import Sequence
from pathlib import Path
from time import strftime
from typing import Any

import cv2
import numpy as np

from n_fold_edge.marker_locator import MarkerLocator
from n_fold_edge.marker_pose import MarkerPose


[docs] class MarkerTracker: """ Track markers in video file or video stream. Parameters ---------- marker_locators : MarkerLocator | Sequence[MarkerLocator] One or more MarkerLocator instances for the markers to track. show_video : bool Whether to show the video as the markers are being tracked. """ def __init__(self, marker_locators: MarkerLocator | Sequence[MarkerLocator], show_video: bool = False) -> None: if isinstance(marker_locators, Sequence): self.marker_locators = marker_locators else: self.marker_locators = (marker_locators,) self.show_video = show_video self.stop_flag = False signal.signal(signal.SIGINT, self._signal_handler) def _signal_handler(self, signal: int, frame: Any) -> None: self.stop_flag = True def _handle_keyboard_events(self, frame: np.ndarray) -> bool: if self.stop_flag: return True if self.show_video is True: # Listen for keyboard events and take relevant actions. key = cv2.waitKey(1) # Discard higher order bit, http://permalink.gmane.org/gmane.comp.lib.opencv.devel/410 key = key & 0xFF if key == 27 or key == 113: # Esc/q return True elif key == 115: # S # save image print("Saving image") filename = strftime("%Y-%m-%d %H-%M-%S") cv2.imwrite(f"output/{filename}.png", frame) return False def _draw_detected_markers(self, frame: np.ndarray, marker_pose: MarkerPose) -> np.ndarray: xm = int(marker_pose.x) ym = int(marker_pose.y) orientation = marker_pose.theta if marker_pose.quality < 0.9: cv2.circle(frame, (xm, ym), 4, (55, 55, 255), 1) else: cv2.circle(frame, (xm, ym), 4, (55, 55, 255), 3) xm2 = int(xm + 50 * math.cos(orientation)) ym2 = int(ym + 50 * math.sin(orientation)) cv2.line(frame, (xm, ym), (xm2, ym2), (255, 0, 0), 2) return frame def _prepare_csv_file(self, csv_file_path: Path) -> None: if csv_file_path.exists(): raise FileExistsError("csv file already exist. Choose another filename.") with open(csv_file_path, "a") as csv_file: writer = csv.writer(csv_file) writer.writerow(["Frame", "x", "y", "direction", "quality", "order"]) def _save_marker_to_csv(self, csv_file_path: Path, marker_pose: MarkerPose, frame_number: float) -> None: with open(csv_file_path, "a") as csv_file: writer = csv.writer(csv_file) writer.writerow([frame_number] + marker_pose.as_list())
[docs] def track(self, video: Path | int, save_video_path: Path | None = None, save_csv_path: Path | None = None) -> None: """ Start tracking of markers in video. Parameters ---------- video : Path | int Path to a video or a id if webcam is used. save_video_path : Path | None Path to save video with detected markers drawn on if desired. save_csv_path : Path | None Path to save found marker location in csv format if desired. """ cap = cv2.VideoCapture(video) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) if save_csv_path is not None: self._prepare_csv_file(save_csv_path) if save_video_path is not None: if isinstance(video, int): fourcc = cv2.VideoWriter.fourcc(*"DIVX") if save_video_path.suffix != ".avi": raise OSError("Only video with avi extension is supported as output.") else: fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) fps = cap.get(cv2.CAP_PROP_FPS) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) video_out = cv2.VideoWriter(save_video_path, fourcc, fps, (frame_width, frame_height)) else: video_out = None if not cap.isOpened(): raise OSError(f"Could not open video {video}.") internal_frame_number = 0 if self.show_video: cv2.namedWindow("Marker Tracker", cv2.WINDOW_AUTOSIZE) while True: ret, frame = cap.read() if not ret: break frame_gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) for ml in self.marker_locators: marker_pose = ml.locate_marker(frame_gray) frame = self._draw_detected_markers(frame, marker_pose) if save_csv_path is not None: frame_number = cap.get(cv2.CAP_PROP_POS_FRAMES) if frame_number < 0: frame_number = internal_frame_number self._save_marker_to_csv(save_csv_path, marker_pose, frame_number) if self.show_video: cv2.imshow("Marker Tracker", frame) if video_out is not None: video_out.write(frame) stop = self._handle_keyboard_events(frame) if stop: break internal_frame_number += 1 if video_out is not None: video_out.release() cap.release() cv2.destroyAllWindows()