Skip to content

Commit

Permalink
Fixed forced to automatic mode transition. Changed multiprocessing to…
Browse files Browse the repository at this point in the history
… use queues instead of events.
  • Loading branch information
blu006 committed Dec 2, 2024
1 parent 552d0c6 commit 2488a6b
Showing 1 changed file with 70 additions and 45 deletions.
115 changes: 70 additions & 45 deletions security_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import hmac
import time
import copy
import queue
from enum import Enum
from typing import Optional
from dataclasses import dataclass
Expand Down Expand Up @@ -96,6 +97,12 @@ def wr_hmac(msg, token):
obj = hmac.new(token, msg=str.encode(msg), digestmod=hashlib.sha256)
return Utils.b64enc(obj.digest())

@staticmethod
def clear_queue(quu):
""" This helper clears a queue"""
while not quu.empty():
_ = quu.get()

class AutoMotionTimer(threading.Thread):
""" Timer thread for monitor shutdown """
def __init__(self, autoEvent, inEvent, screenOn, screenOff):
Expand All @@ -117,27 +124,34 @@ def run(self):
"""
counter = 0
limit = 900
last = self._auto.is_set()
logging.debug("Automatic control start.")
while not self._event.wait(1):
# get input status and clear it
is_set = self._input.is_set()
if is_set:
trig = self._input.is_set()
if trig:
self._input.clear()
# is automatic mode allowed
if self._auto.is_set():
if is_set:
counter = 0
# screen on
self._on_fun()

# increment counter until limit
if counter < limit:
counter += 1

if self._auto.is_set():
# if triggered, turn on and reset counter
if trig:
counter = 0
# screen on
self._on_fun()
# turn off the screen
# or turn back to a known-on state if we are resuming automatic control
if counter >= limit:
# screen off
self._off_fun()
elif last is False:
# screen on
self._on_fun()

last = self._auto.is_set()

logging.debug("Automatic control stop.")

Expand Down Expand Up @@ -201,7 +215,6 @@ def stop(self):

class SecurityMonitor():
""" Security Monitor Windowing and Splitting """
# TODO use queue for return instead
urls = ["rtsp://maglab:magcat@connor.maglab:8554/Camera1_sub",
"rtsp://maglab:magcat@connor.maglab:8554/Camera2_sub"]

Expand All @@ -212,14 +225,13 @@ class SecurityMonitor():
# 2 -> 2x2
# 3 -> 3x2
# 4 -> 3x3
def __init__(self, quit_event, splitter_refresh_rate, div_idx):
def __init__(self, quit_queue, splitter_refresh_rate, div_idx):
self.refresh_rate = splitter_refresh_rate
self._event_all = quit_event
self._queue_all = quit_queue
self._calc_div(div_idx)

self.evt = [multiprocessing.Event() for _ in range(self._div[2]*2)]
self.que = [multiprocessing.Queue() for _ in range(self._div[2]*2)]
self.proc = [None] * (self._div[2]*2)
self.event_w = threading.Event()

# Helper Functions
# generate position string based on divisions and index
Expand Down Expand Up @@ -288,8 +300,9 @@ def _idx2pos(self, idx):
return [idx % self._div[0], idx // self._div[0]]

# this process actually contains the mpv stream player
def _play_process(self, event_in, event_out, name):
def _play_process(self, queue_in, queue_out, name):
idx = name % self._div[2]
geo_str = self._gen_geo_str(idx)
player = mpv.MPV()
# a series of configuration options that make the player act like a
# security monitor
Expand All @@ -298,7 +311,6 @@ def _play_process(self, event_in, event_out, name):
player.keepaspect = "no"
player.ao = "pulseaudio"
player.profile = "low-latency"
geo_str = self._gen_geo_str(idx)
player.geometry = geo_str
# enter the camera URL and wait until it starts to play
player.play(self.urls[idx])
Expand All @@ -308,30 +320,34 @@ def _play_process(self, event_in, event_out, name):
logging.debug(f"Waiting for player {name} to start...")
player.wait_until_playing(timeout=30)
# set the output event to terminate the player behind this one
# pylint: disable-next=broad-exception-caught
except Exception as exc:
logging.error(f"Player {name} stopped while waiting to start playing: {str(exc)}")
player.terminate()
finally:
logging.debug(f"Asking player below {name} to end.")
event_out.set()
queue_out.put(True)

try:
while not event_in.is_set():
try:
player.wait_for_event(None, timeout=1)
except TimeoutError:
# this is normal. the function should be timing out.
continue
except mpv.ShutdownError:
while True:
try:
_ = queue_in.get(timeout=1)
# if the queue returns actual data, shut down this thread
break
except queue.Empty:
# normal exception. the queue should return empty most of the time
# we use this as an opportunity to run the "finally" block and check the player
continue
finally:
if player.core_shutdown:
# shut everything down if the player shuts down unexpectedly
logging.critical("Unexpected player shutdown. Shutting down.")
self._event_all.set()
except KeyboardInterrupt:
logging.warning("Player caught Keyboard Interrupt.")
continue
finally:
logging.info(f"Player {name} stopping.")
player.terminate()
del player
self._queue_all.put(True)
# pylint: disable-next=lost-exception
break

logging.info(f"Player {name} stopping.")
player.terminate()
del player

# helper function to spawn a player
def _handle_player(self, last_p, running = True):
Expand All @@ -346,11 +362,13 @@ def _handle_player(self, last_p, running = True):
last_p = (last_p + self._div[2]) % (self._div[2] * 2)
logging.info(f"Starting player: {i_play}")
self.proc[i_play] = multiprocessing.Process(target=self._play_process, args=(
self.evt[i_play],
self.evt[last_p],
self.que[i_play],
self.que[last_p],
i_play))
self.proc[i_play].daemon = True
self.evt[i_play].clear()
# clear the queue
while not self.que[i_play].empty():
_ = self.que[i_play].get()
self.proc[i_play].start()
logging.info(f"Player process started: {i_play}")

Expand All @@ -365,7 +383,7 @@ def main(self):
self._handle_player(i, False)
time_cnt = 0
p_cnt = 0
while not self._event_all.is_set():
while True:
time_cnt += 1
if time_cnt >= self.refresh_rate:
time_cnt = 0
Expand All @@ -378,11 +396,18 @@ def main(self):
logging.error(f"Killing stuck player {p_cnt}")
self.proc[p_cnt].kill()
p_cnt = (p_cnt + 1) % (self._div[2]*2)
self.event_w.wait(1)
try:
_ = self._queue_all.get(timeout=1)
# if the queue returns data, shut everything down
break
except queue.Empty:
# normal exception. the queue should return empty unless we are exiting.
continue

finally:
logging.info("Asking player processes to exit...")
for cur_ev in self.evt:
cur_ev.set()
for cur_q in self.que:
cur_q.put(True)

logging.info("Waiting for player processes...")
for curr_proc, _ in enumerate(self.proc):
Expand Down Expand Up @@ -434,7 +459,7 @@ def __init__(self):
# turns the screen off
self.screen_off = threading.Event()
# stops video
self.stop_playing = multiprocessing.Event()
self.stop_playing = multiprocessing.Queue()
# exits this program
self.monitor_exit = threading.Event()

Expand Down Expand Up @@ -539,17 +564,17 @@ def mon_on(self):
""" turns the monitor on """
if self.screen_off.is_set():
self.screen_off.clear()
self.stop_playing.clear()
Utils.clear_queue(self.stop_playing)

def mon_off(self):
""" turns the monitor off """
if not self.screen_off.is_set():
self.screen_off.set()
self.stop_playing.set()
self.stop_playing.put(True)

def mon_restart(self):
""" restarts the internal video wall class """
self.stop_playing.set()
self.stop_playing.put(True)
self.mtstate = self.MTState.RESTART

def on_message(self, _, __, msg):
Expand Down Expand Up @@ -589,7 +614,7 @@ def signal_handler(self, signum, frame):
# pylint: disable=unused-argument
""" signal handling helper function """
logging.warning(f"Caught a deadly signal: {signum}!")
self.stop_playing.set()
self.stop_playing.put(True)
self.monitor_exit.set()

def _mt_loop(self):
Expand All @@ -598,7 +623,7 @@ def _mt_loop(self):
logging.debug(f"Montior Loop State: {self.mtstate}")
# execution
if self.mtstate == self.MTState.PLAYING:
self.stop_playing.clear()
Utils.clear_queue(self.stop_playing)
sm2 = SecurityMonitor(self.stop_playing, self.config.splitter_refresh_rate, 1)
sm2.urls = self.config.urls
if self.pm_able:
Expand Down

0 comments on commit 2488a6b

Please sign in to comment.