Skip to content

Commit

Permalink
bug fix for default arg of streamtype
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler-romero committed Jan 12, 2024
1 parent 94a9784 commit 20ec140
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions src/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
-f, --fps=FPS number of frames to capture per second. 0 to use maximum rate possible. [default: 5]
-h, --help show this message.
-s, --stream=STREAM id, filename or URL of a video stream (e.g. rtsp://host:port/script?params OR movie.mp4 OR *.jpg) [default: 0]
-x, --streamtype=TYPE (optional) type of stream. One of [device, directory, rtsp, youtube, file, image_url] [default: auto-infer]
-x, --streamtype=TYPE type of stream. One of [infer, device, directory, rtsp, youtube, file, image_url] [default: infer]
-t, --token=TOKEN api token to authenticate with the groundlight api
-v, --verbose enable debug logs
-w, --width=WIDTH resize images to w pixels wide (and scale height proportionately if not set explicitly)
Expand All @@ -25,13 +25,10 @@
import math
import os
import time
from asyncio import QueueEmpty
from logging.config import dictConfig
from operator import truediv
from queue import Empty, Queue
from threading import Thread
from typing import Tuple
from xmlrpc.client import Boolean

import cv2
import docopt
Expand All @@ -55,15 +52,19 @@ def force_exit(self):
self.exit_all_threads = True


def frame_processor(q: Queue, client: Groundlight, detector: str, control: ThreadControl):
def frame_processor(
q: Queue, client: Groundlight, detector: str, control: ThreadControl
):
logger.debug(f"frame_processor({q=}, {client=}, {detector=})")
global thread_control_request_exit
while True:
if control.exit_all_threads:
logger.debug("exiting worker thread.")
break
try:
frame = q.get(timeout=1) # timeout avoids deadlocked orphan when main process dies
frame = q.get(
timeout=1
) # timeout avoids deadlocked orphan when main process dies
except Empty:
continue
try:
Expand Down Expand Up @@ -127,7 +128,9 @@ def parse_crop_string(crop_string: str) -> Tuple[float, float, float, float]:

for n in numbers:
if (n < 0) or (n > 1):
raise ValueError("All numbers must be between 0 and 1, showing relative position in image")
raise ValueError(
"All numbers must be between 0 and 1, showing relative position in image"
)

if numbers[0] + numbers[2] > 1.0:
raise ValueError("Invalid crop: x+w is greater than 1.")
Expand Down Expand Up @@ -171,16 +174,24 @@ def main():

STREAM = args["--stream"]
STREAM_TYPE = args.get("--streamtype")
if STREAM_TYPE is None:
STREAM_TYPE = STREAM_TYPE.lower()
if STREAM_TYPE not in [
"infer",
"device",
"directory",
"rtsp",
"youtube",
"file",
"image_url",
]:
raise ValueError(f"Invalid stream type {STREAM_TYPE=}")
logger.debug(f"{STREAM_TYPE=}")
if STREAM_TYPE == "infer":
try:
STREAM = int(STREAM)
except ValueError as e:
logger.debug(f"{STREAM=} is not an int. Treating as a filename or url.")
else:
STREAM_TYPE = STREAM_TYPE.lower()
if STREAM_TYPE not in ["device", "directory", "rtsp", "youtube", "file", "image_url"]:
raise ValueError(f"Invalid stream type {STREAM_TYPE=}")
logger.debug(f"{STREAM_TYPE=}")
STREAM_TYPE = None

FPS = args["--fps"]
try:
Expand Down Expand Up @@ -223,15 +234,20 @@ def main():

logger.debug(f"creating groundlight client with {ENDPOINT=} and {TOKEN=}")
gl = Groundlight(endpoint=ENDPOINT, api_token=TOKEN)
grabber = FrameGrabber.create_grabber(stream=STREAM, stream_type=STREAM_TYPE, fps_target=FPS)
grabber = FrameGrabber.create_grabber(
stream=STREAM, stream_type=STREAM_TYPE, fps_target=FPS
)
q = Queue()
tc = ThreadControl()
if motion_detect:
m = MotionDetector(pct_threshold=motion_threshold)
workers = []

for i in range(worker_thread_count):
thread = Thread(target=frame_processor, kwargs=dict(q=q, client=gl, detector=DETECTOR, control=tc))
thread = Thread(
target=frame_processor,
kwargs=dict(q=q, client=gl, detector=DETECTOR, control=tc),
)
workers.append(thread)
thread.start()

Expand All @@ -252,7 +268,9 @@ def main():
continue

now = time.time()
logger.debug(f"captured a new frame after {now-start:.3f}s of size {frame.shape=} ")
logger.debug(
f"captured a new frame after {now-start:.3f}s of size {frame.shape=} "
)

if crop_region:
frame = crop_frame(frame, crop_region)
Expand Down Expand Up @@ -292,7 +310,9 @@ def main():
f"Falling behind the desired {FPS=}. Either grabbing frames or putting them into output queue (length={q.qsize()}) is taking too long."
)
else:
logger.debug(f"waiting for {actual_delay=:.3}s to capture the next frame.")
logger.debug(
f"waiting for {actual_delay=:.3}s to capture the next frame."
)
time.sleep(actual_delay)

except KeyboardInterrupt:
Expand Down

0 comments on commit 20ec140

Please sign in to comment.