From 9c9c1ce7536fab1add5f855078608d7320c3e251 Mon Sep 17 00:00:00 2001 From: Patrick Gehrsitz Date: Wed, 14 Aug 2024 19:34:54 +0200 Subject: [PATCH] feat: Add basic USB-Camera support (#10) Signed-off-by: Patrick Gehrsitz --- .gitignore | 2 + README.md | 3 +- resources/spyglass.conf | 3 + scripts/spyglass | 1 + spyglass/camera.py | 45 ----- spyglass/camera/__init__.py | 25 +++ spyglass/camera/camera.py | 94 ++++++++++ spyglass/camera/csi.py | 46 +++++ spyglass/camera/usb.py | 28 +++ spyglass/camera_options.py | 8 +- spyglass/cli.py | 68 +++---- spyglass/server.py | 171 ++++++++--------- tests/test_cli.py | 365 +++++++----------------------------- 13 files changed, 374 insertions(+), 485 deletions(-) delete mode 100644 spyglass/camera.py create mode 100644 spyglass/camera/__init__.py create mode 100644 spyglass/camera/camera.py create mode 100644 spyglass/camera/csi.py create mode 100644 spyglass/camera/usb.py diff --git a/.gitignore b/.gitignore index 5408877..faf5477 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,5 @@ cover/ .idea/ venv +.venv +.vscode diff --git a/README.md b/README.md index 7d2e222..b86700f 100644 --- a/README.md +++ b/README.md @@ -57,11 +57,12 @@ On startup the following arguments are supported: | `--list-controls` | List all available libcamera controls onto the console. Those can be used with `--controls` | | | `-tf`, `--tuning_filter` | Set a tuning filter file name. | | | `-tfd`, `--tuning_filter_dir` | Set the directory to look for tuning filters. | | +| `-n`, `--camera_num` | Camera number to be used. All cameras with their number can be shown with `libcamera-hello`. | `0` | Starting the server without any argument is the same as ```shell -./run.py -b 0.0.0.0 -p 8080 -r 640x480 -f 15 -st '/stream' -sn '/snapshot' -af continuous -l 0.0 -s normal +./run.py -b 0.0.0.0 -p 8080 -r 640x480 -f 15 -st '/stream' -sn '/snapshot' -af continuous -l 0.0 -s normal -n 0 ``` The stream can then be accessed at `http://:8080/stream` diff --git a/resources/spyglass.conf b/resources/spyglass.conf index 1072b56..8e2a7f2 100644 --- a/resources/spyglass.conf +++ b/resources/spyglass.conf @@ -9,6 +9,9 @@ #### NOTE: Values has to be surrounded by double quotes! ("value") #### NOTE: If commented out or includes typos it will use hardcoded defaults! +#### Libcamera camera to use (INTEGER)[default: 0] +CAMERA_NUM="0" + #### Running Spyglass with proxy or Standalone (BOOL)[default: true] NO_PROXY="true" diff --git a/scripts/spyglass b/scripts/spyglass index 46b6261..e41d155 100755 --- a/scripts/spyglass +++ b/scripts/spyglass @@ -94,6 +94,7 @@ run_spyglass() { fi "${PY_BIN}" "$(dirname "${BASE_SPY_PATH}")/run.py" \ + --camera_num "${CAMERA_NUM:-0}" \ --bindaddress "${bind_adress}" \ --port "${HTTP_PORT:-8080}" \ --resolution "${RESOLUTION:-640x480}" \ diff --git a/spyglass/camera.py b/spyglass/camera.py deleted file mode 100644 index 2090392..0000000 --- a/spyglass/camera.py +++ /dev/null @@ -1,45 +0,0 @@ -import libcamera -from spyglass.camera_options import process_controls -from picamera2 import Picamera2 - -def init_camera( - width: int, - height: int, - fps: int, - autofocus: str, - lens_position: float, - autofocus_speed: str, - upsidedown=False, - flip_horizontal=False, - flip_vertical=False, - control_list: list[list[str]]=[], - tuning_filter=None, - tuning_filter_dir=None): - - tuning = None - - if tuning_filter: - params = {'tuning_file': tuning_filter} - if tuning_filter_dir: - params['dir'] = tuning_filter_dir - tuning = Picamera2.load_tuning_file(**params) - - picam2 = Picamera2(tuning=tuning) - controls = {'FrameRate': fps} - - c = process_controls(picam2, [tuple(ctrl) for ctrl in control_list]) - controls.update(c) - - if 'AfMode' in picam2.camera_controls: - controls['AfMode'] = autofocus - controls['AfSpeed'] = autofocus_speed - if autofocus == libcamera.controls.AfModeEnum.Manual: - controls['LensPosition'] = lens_position - else: - print('Attached camera does not support autofocus') - - transform = libcamera.Transform(hflip=int(flip_horizontal or upsidedown), vflip=int(flip_vertical or upsidedown)) - - picam2.configure(picam2.create_video_configuration(main={'size': (width, height)}, controls=controls, transform=transform)) - - return picam2 diff --git a/spyglass/camera/__init__.py b/spyglass/camera/__init__.py new file mode 100644 index 0000000..82ec6cf --- /dev/null +++ b/spyglass/camera/__init__.py @@ -0,0 +1,25 @@ +from picamera2 import Picamera2 + +from spyglass.camera.camera import Camera +from spyglass.camera.csi import CSI +from spyglass.camera.usb import USB + +def init_camera( + camera_num: int, + tuning_filter=None, + tuning_filter_dir=None + ) -> Camera: + tuning = None + + if tuning_filter: + params = {'tuning_file': tuning_filter} + if tuning_filter_dir: + params['dir'] = tuning_filter_dir + tuning = Picamera2.load_tuning_file(**params) + + picam2 = Picamera2(camera_num, tuning=tuning) + if picam2._is_rpi_camera(): + cam = CSI(picam2) + else: + cam = USB(picam2) + return cam diff --git a/spyglass/camera/camera.py b/spyglass/camera/camera.py new file mode 100644 index 0000000..77993b1 --- /dev/null +++ b/spyglass/camera/camera.py @@ -0,0 +1,94 @@ +import libcamera + +from abc import ABC, abstractmethod +from picamera2 import Picamera2 + +from spyglass import logger +from spyglass.exif import create_exif_header +from spyglass.camera_options import process_controls +from spyglass.server import StreamingServer, StreamingHandler + +class Camera(ABC): + def __init__(self, picam2: Picamera2): + self.picam2 = picam2 + + def create_controls(self, fps: int, autofocus: str, lens_position: float, autofocus_speed: str): + controls = {} + + if 'FrameRate' in self.picam2.camera_controls: + controls['FrameRate'] = fps + + if 'AfMode' in self.picam2.camera_controls: + controls['AfMode'] = autofocus + controls['AfSpeed'] = autofocus_speed + if autofocus == libcamera.controls.AfModeEnum.Manual: + controls['LensPosition'] = lens_position + else: + logger.warning('Attached camera does not support autofocus') + + return controls + + def configure(self, + width: int, + height: int, + fps: int, + autofocus: str, + lens_position: float, + autofocus_speed: str, + control_list: list[list[str]]=[], + upsidedown=False, + flip_horizontal=False, + flip_vertical=False): + controls = self.create_controls(fps, autofocus, lens_position, autofocus_speed) + c = process_controls(self.picam2, [tuple(ctrl) for ctrl in control_list]) + controls.update(c) + + transform = libcamera.Transform( + hflip=int(flip_horizontal or upsidedown), + vflip=int(flip_vertical or upsidedown) + ) + + self.picam2.configure( + self.picam2.create_video_configuration( + main={'size': (width, height)}, + controls=controls, + transform=transform + ) + ) + + def _run_server(self, + bind_address, + port, + streaming_handler: StreamingHandler, + get_frame, + stream_url='/stream', + snapshot_url='/snapshot', + orientation_exif=0): + logger.info('Server listening on %s:%d', bind_address, port) + logger.info('Streaming endpoint: %s', stream_url) + logger.info('Snapshot endpoint: %s', snapshot_url) + logger.info('Controls endpoint: %s', '/controls') + address = (bind_address, port) + streaming_handler.picam2 = self.picam2 + streaming_handler.get_frame = get_frame + streaming_handler.stream_url = stream_url + streaming_handler.snapshot_url = snapshot_url + if orientation_exif > 0: + streaming_handler.exif_header = create_exif_header(orientation_exif) + else: + streaming_handler.exif_header = None + current_server = StreamingServer(address, streaming_handler) + current_server.serve_forever() + + @abstractmethod + def start_and_run_server(self, + bind_address, + port, + stream_url='/stream', + snapshot_url='/snapshot', + orientation_exif=0): + pass + + @abstractmethod + def stop(self): + pass diff --git a/spyglass/camera/csi.py b/spyglass/camera/csi.py new file mode 100644 index 0000000..3bec8e8 --- /dev/null +++ b/spyglass/camera/csi.py @@ -0,0 +1,46 @@ +import io + +from picamera2.encoders import MJPEGEncoder +from picamera2.outputs import FileOutput +from threading import Condition + +from spyglass import camera +from spyglass.server import StreamingHandler + +class CSI(camera.Camera): + def start_and_run_server(self, + bind_address, + port, + stream_url='/stream', + snapshot_url='/snapshot', + orientation_exif=0): + + class StreamingOutput(io.BufferedIOBase): + def __init__(self): + self.frame = None + self.condition = Condition() + + def write(self, buf): + with self.condition: + self.frame = buf + self.condition.notify_all() + output = StreamingOutput() + def get_frame(inner_self): + with output.condition: + output.condition.wait() + return output.frame + + self.picam2.start_recording(MJPEGEncoder(), FileOutput(output)) + + self._run_server( + bind_address, + port, + StreamingHandler, + get_frame, + stream_url=stream_url, + snapshot_url=snapshot_url, + orientation_exif=orientation_exif + ) + + def stop(self): + self.picam2.stop_recording() diff --git a/spyglass/camera/usb.py b/spyglass/camera/usb.py new file mode 100644 index 0000000..b26adee --- /dev/null +++ b/spyglass/camera/usb.py @@ -0,0 +1,28 @@ +from spyglass import camera +from spyglass.server import StreamingHandler + +class USB(camera.Camera): + def start_and_run_server(self, + bind_address, + port, + stream_url='/stream', + snapshot_url='/snapshot', + orientation_exif=0): + def get_frame(inner_self): + #TODO: Cuts framerate in 1/n with n streams open, add some kind of buffer + return self.picam2.capture_buffer() + + self.picam2.start() + + self._run_server( + bind_address, + port, + StreamingHandler, + get_frame, + stream_url=stream_url, + snapshot_url=snapshot_url, + orientation_exif=orientation_exif + ) + + def stop(self): + self.picam2.stop() diff --git a/spyglass/camera_options.py b/spyglass/camera_options.py index 3c15321..a2bc95a 100644 --- a/spyglass/camera_options.py +++ b/spyglass/camera_options.py @@ -80,12 +80,16 @@ def parse_from_string(input_string: str) -> any: def get_type_str(obj) -> str: return str(type(obj)).split('\'')[1] -def get_libcamera_controls_string(camera_path: str) -> str: +def get_libcamera_controls_string(camera_num: str) -> str: ctrls_str = "" libcam_cm = libcamera.CameraManager.singleton() - cam = libcam_cm.cameras[0] + if camera_num > len(libcam_cm.cameras) - 1: + return ctrls_str + cam = libcam_cm.cameras[camera_num] + def rectangle_to_tuple(rectangle): return (rectangle.x, rectangle.y, rectangle.width, rectangle.height) + for k, v in cam.controls.items(): if isinstance(v.min, libcamera.Rectangle): min = rectangle_to_tuple(v.min) diff --git a/spyglass/cli.py b/spyglass/cli.py index 6c0e454..6370241 100644 --- a/spyglass/cli.py +++ b/spyglass/cli.py @@ -2,22 +2,16 @@ Parse command line arguments in, invoke server. """ + import argparse -import logging import re import sys - import libcamera -from picamera2.encoders import MJPEGEncoder -from picamera2.outputs import FileOutput - +from spyglass import camera_options, logger from spyglass.exif import option_to_exif_orientation from spyglass.__version__ import __version__ from spyglass.camera import init_camera -from spyglass.server import StreamingOutput -from spyglass.server import run_server -from spyglass import camera_options MAX_WIDTH = 1920 @@ -31,57 +25,58 @@ def main(args=None): becomes sys.exit(main()). The __main__ entry point similarly wraps sys.exit(). """ - logging.info(f"Spyglass {__version__}") + logger.info(f"Spyglass {__version__}") if args is None: args = sys.argv[1:] parsed_args = get_args(args) - bind_address = parsed_args.bindaddress - port = parsed_args.port + if parsed_args.list_controls: + controls_str = camera_options.get_libcamera_controls_string(parsed_args.camera_num) + if not controls_str: + print(f"Camera {parsed_args.camera_num} not found") + else: + print('Available controls:\n'+controls_str) + return + width, height = split_resolution(parsed_args.resolution) - stream_url = parsed_args.stream_url - snapshot_url = parsed_args.snapshot_url - orientation_exif = parsed_args.orientation_exif controls = parsed_args.controls if parsed_args.controls_string: controls += [c.split('=') for c in parsed_args.controls_string.split(',')] - if parsed_args.list_controls: - print('Available controls:\n'+camera_options.get_libcamera_controls_string(0)) - return - picam2 = init_camera( - width, - height, - parsed_args.fps, - parse_autofocus(parsed_args.autofocus), - parsed_args.lensposition, - parse_autofocus_speed(parsed_args.autofocusspeed), - parsed_args.upsidedown, - parsed_args.flip_horizontal, - parsed_args.flip_vertical, - controls, + cam = init_camera( + parsed_args.camera_num, parsed_args.tuning_filter, parsed_args.tuning_filter_dir) - output = StreamingOutput() - picam2.start_recording(MJPEGEncoder(), FileOutput(output)) - + cam.configure(width, + height, + parsed_args.fps, + parse_autofocus(parsed_args.autofocus), + parsed_args.lensposition, + parse_autofocus_speed(parsed_args.autofocusspeed), + controls, + parsed_args.upsidedown, + parsed_args.flip_horizontal, + parsed_args.flip_vertical,) try: - run_server(bind_address, port, picam2, output, stream_url, snapshot_url, orientation_exif) + cam.start_and_run_server(parsed_args.bindaddress, + parsed_args.port, + parsed_args.stream_url, + parsed_args.snapshot_url, + parsed_args.orientation_exif) finally: - picam2.stop_recording() - + cam.stop() # region args parsers - def resolution_type(arg_value, pat=re.compile(r"^\d+x\d+$")): if not pat.match(arg_value): raise argparse.ArgumentTypeError("invalid value: x expected.") return arg_value + def control_type(arg_value: str): if '=' in arg_value: return arg_value.split('=') @@ -127,7 +122,6 @@ def split_resolution(res): # region cli args - def get_args(args): """Parse arguments passed in from shell.""" return get_parser().parse_args(args) @@ -187,7 +181,7 @@ def get_parser(): parser.add_argument('-tfd', '--tuning_filter_dir', type=str, default=None, nargs='?',const="", help='Set the directory to look for tuning filters.') parser.add_argument('--list-controls', action='store_true', help='List available camera controls and exits.') - + parser.add_argument('-n', '--camera_num', type=int, default=0, help='Camera number to be used (Works with --list-controls)') return parser # endregion cli args diff --git a/spyglass/server.py b/spyglass/server.py index ed2714f..4b2cb79 100755 --- a/spyglass/server.py +++ b/spyglass/server.py @@ -1,119 +1,94 @@ -import io -import logging +#!/usr/bin/python3 + import socketserver + from http import server -from threading import Condition + +from spyglass import logger from spyglass.url_parsing import check_urls_match, get_url_params from spyglass.exif import create_exif_header from spyglass.camera_options import parse_dictionary_to_html_page, process_controls -from . import logger - - -class StreamingOutput(io.BufferedIOBase): - def __init__(self): - self.frame = None - self.condition = Condition() - - def write(self, buf): - with self.condition: - self.frame = buf - self.condition.notify_all() - class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True +class StreamingHandler(server.BaseHTTPRequestHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.picam2 = None + self.exif_header = None + self.stream_url = None + self.snapshot_url = None + self.get_frame = None -def run_server(bind_address, - port, - camera, - output: StreamingOutput, - stream_url='/stream', - snapshot_url='/snapshot', - orientation_exif=0): - exif_header = create_exif_header(orientation_exif) - - class StreamingHandler(server.BaseHTTPRequestHandler): - def do_GET(self): - if check_urls_match(stream_url, self.path): - self.start_streaming() - elif check_urls_match(snapshot_url, self.path): - self.send_snapshot() - elif check_urls_match('/controls', self.path): - parsed_controls = get_url_params(self.path) - parsed_controls = parsed_controls if parsed_controls else None - processed_controls = process_controls(camera, parsed_controls) - camera.set_controls(processed_controls) - content = parse_dictionary_to_html_page(camera, parsed_controls, processed_controls).encode('utf-8') - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.send_header('Content-Length', len(content)) - self.end_headers() - self.wfile.write(content) - else: - self.send_error(404) - self.end_headers() - - def start_streaming(self): - try: - self.send_response(200) - self.send_default_headers() - self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') - self.end_headers() - while True: - with output.condition: - output.condition.wait() - frame = output.frame - self.wfile.write(b'--FRAME\r\n') - if exif_header is None: - self.send_jpeg_content_headers(frame) - self.end_headers() - self.wfile.write(frame) - self.wfile.write(b'\r\n') - else: - self.send_jpeg_content_headers(frame, len(exif_header) - 2) - self.end_headers() - self.wfile.write(exif_header) - self.wfile.write(frame[2:]) - self.wfile.write(b'\r\n') - except Exception as e: - logging.warning('Removed streaming client %s: %s', self.client_address, str(e)) + def do_GET(self): + if check_urls_match(self.stream_url, self.path): + self.start_streaming() + elif check_urls_match(self.snapshot_url, self.path): + self.send_snapshot() + elif check_urls_match('/controls', self.path): + parsed_controls = get_url_params(self.path) + parsed_controls = parsed_controls if parsed_controls else None + processed_controls = process_controls(self.picam2, parsed_controls) + self.picam2.set_controls(processed_controls) + content = parse_dictionary_to_html_page(self.picam2, parsed_controls, processed_controls).encode('utf-8') + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.send_header('Content-Length', len(content)) + self.end_headers() + self.wfile.write(content) + else: + self.send_error(404) + self.end_headers() - def send_snapshot(self): - try: - self.send_response(200) - self.send_default_headers() - with output.condition: - output.condition.wait() - frame = output.frame - if orientation_exif <= 0: + def start_streaming(self): + try: + self.send_response(200) + self.send_default_headers() + self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') + self.end_headers() + while True: + frame = self.get_frame() + self.wfile.write(b'--FRAME\r\n') + if self.exif_header is None: self.send_jpeg_content_headers(frame) self.end_headers() self.wfile.write(frame) + self.wfile.write(b'\r\n') else: - self.send_jpeg_content_headers(frame, len(exif_header) - 2) + self.send_jpeg_content_headers(frame, len(self.exif_header) - 2) self.end_headers() - self.wfile.write(exif_header) + self.wfile.write(self.exif_header) self.wfile.write(frame[2:]) - except Exception as e: - logging.warning( - 'Removed client %s: %s', - self.client_address, str(e)) + self.wfile.write(b'\r\n') + except Exception as e: + logger.warning('Removed streaming client %s: %s', self.client_address, str(e)) - def send_default_headers(self): - self.send_header('Age', 0) - self.send_header('Cache-Control', 'no-cache, private') - self.send_header('Pragma', 'no-cache') + def send_snapshot(self): + try: + self.send_response(200) + self.send_default_headers() + frame = self.get_frame() + if self.exif_header is None: + self.send_jpeg_content_headers(frame) + self.end_headers() + self.wfile.write(frame) + else: + self.send_jpeg_content_headers(frame, len(self.exif_header) - 2) + self.end_headers() + self.wfile.write(self.exif_header) + self.wfile.write(frame[2:]) + except Exception as e: + logger.warning( + 'Removed client %s: %s', + self.client_address, str(e)) - def send_jpeg_content_headers(self, frame, extra_len=0): - self.send_header('Content-Type', 'image/jpeg') - self.send_header('Content-Length', str(len(frame) + extra_len)) + def send_default_headers(self): + self.send_header('Age', 0) + self.send_header('Cache-Control', 'no-cache, private') + self.send_header('Pragma', 'no-cache') - logger.info('Server listening on %s:%d', bind_address, port) - logger.info('Streaming endpoint: %s', stream_url) - logger.info('Snapshot endpoint: %s', snapshot_url) - logger.info('Controls endpoint: %s', '/controls') - address = (bind_address, port) - current_server = StreamingServer(address, StreamingHandler) - current_server.serve_forever() + def send_jpeg_content_headers(self, frame, extra_len=0): + self.send_header('Content-Type', 'image/jpeg') + self.send_header('Content-Length', str(len(frame) + extra_len)) diff --git a/tests/test_cli.py b/tests/test_cli.py index a0a3a4f..5de2ac1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -19,6 +19,7 @@ DEFAULT_CONTROLS = [] DEFAULT_TUNING_FILTER = None DEFAULT_TUNING_FILTER_DIR = None +DEFAULT_CAMERA_NUM = 0 @pytest.fixture(autouse=True) @@ -76,49 +77,67 @@ def test_parse_tuning_filter_dir(): assert args.tuning_filter_dir == 'dir' -@patch("spyglass.server.run_server") @patch("spyglass.camera.init_camera") -def test_init_camera_with_defaults(mock_spyglass_camera, mock_spyglass_server): +def test_init_camera_with_defaults(mock_spyglass_camera,): from spyglass import cli import spyglass.camera cli.main(args=[]) spyglass.camera.init_camera.assert_called_once_with( + DEFAULT_CAMERA_NUM, + DEFAULT_TUNING_FILTER, + DEFAULT_TUNING_FILTER_DIR + ) + +@patch("spyglass.camera.camera.Camera.configure") +@patch("spyglass.camera.init_camera") +def test_configure_with_defaults(mock_init_camera, mock_configure): + from spyglass import cli + + cli.main(args=[]) + cam_instance = mock_init_camera.return_value + cam_instance.configure.assert_called_once_with( DEFAULT_WIDTH, DEFAULT_HEIGHT, DEFAULT_FPS, DEFAULT_AUTOFOCUS_MODE, DEFAULT_LENS_POSITION, DEFAULT_AF_SPEED, + DEFAULT_CONTROLS, DEFAULT_UPSIDE_DOWN, DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR + DEFAULT_FLIP_VERTICALLY ) - -@patch("spyglass.server.run_server") +@patch("spyglass.camera.camera.Camera.configure") @patch("spyglass.camera.init_camera") -def test_init_camera_resolution(mock_spyglass_server, mock_spyglass_camera): +def test_configure_with_parameters(mock_init_camera, mock_configure): from spyglass import cli - import spyglass.camera + cli.main(args=[ - '-r', '200x100' + '-n', '1', + '-tf', 'test', + '-tfd', 'test-dir', + '-r', '200x100', + '-f', '20', + '-af', 'manual', + '-l', '1.0', + '-s', 'normal', + '-ud', '-fh', '-fv', + '-c', 'brightness=-0.4', + '-c', 'awbenable=false', ]) - spyglass.camera.init_camera.assert_called_once_with( + cam_instance = mock_init_camera.return_value + cam_instance.configure.assert_called_once_with( 200, 100, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR + 20, + AF_MODE_ENUM_MANUAL, + 1.0, + AF_SPEED_ENUM_NORMAL, + [['brightness', '-0.4'],['awbenable', 'false']], + True, + True, + True ) @@ -138,251 +157,35 @@ def test_raise_error_when_height_greater_than_maximum(): ]) -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_fps(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-f', '20' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - 20, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_af_manual(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-af', 'manual' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - AF_MODE_ENUM_MANUAL, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") +@patch("spyglass.camera.camera.Camera.configure") @patch("spyglass.camera.init_camera") -def test_init_camera_af_continuous(mock_spyglass_server, mock_spyglass_camera): +def test_configure_camera_af_continuous_speed_fast(mock_init_camera, mock_configure): from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-af', 'continuous' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - AF_MODE_ENUM_CONTINUOUS, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_lens_position(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-l', '1.0' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - 1.0, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_af_speed_normal(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-s', 'normal' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - AF_SPEED_ENUM_NORMAL, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_af_speed_fast(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera cli.main(args=[ + '-af', 'continuous', '-s', 'fast' ]) - spyglass.camera.init_camera.assert_called_once_with( + cam_instance = mock_init_camera.return_value + cam_instance.configure.assert_called_once_with( DEFAULT_WIDTH, DEFAULT_HEIGHT, DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, + AF_MODE_ENUM_CONTINUOUS, DEFAULT_LENS_POSITION, AF_SPEED_ENUM_FAST, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_upside_down(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-ud' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - True, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_flip_horizontal(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-fh' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - True, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_flip_vertical(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-fv' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, DEFAULT_UPSIDE_DOWN, DEFAULT_FLIP_HORIZONTALLY, - True, - DEFAULT_CONTROLS, - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR + DEFAULT_FLIP_VERTICALLY ) -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_controls(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-c', 'brightness=-0.4', - '-c', 'awbenable=false' - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - [['brightness', '-0.4'],['awbenable', 'false']], - DEFAULT_TUNING_FILTER, - DEFAULT_TUNING_FILTER_DIR - ) - -@patch("spyglass.server.run_server") +@patch("spyglass.camera.csi.CSI.start_and_run_server") @patch("spyglass.camera.init_camera") -def test_run_server_with_configuration_from_arguments(mock_spyglass_server, mock_spyglass_camera): +def test_run_server_with_configuration_from_arguments(mock_init_camera, mock_run_server): from spyglass import cli - import spyglass.server + cli.main(args=[ '-b', '1.2.3.4', '-p', '1234', @@ -390,10 +193,17 @@ def test_run_server_with_configuration_from_arguments(mock_spyglass_server, mock '-sn', 'snapshot-url', '-or', 'h' ]) - spyglass.server.run_server.assert_called_once_with('1.2.3.4', 1234, ANY, ANY, 'streaming-url', 'snapshot-url', 1) + cam_instance = mock_init_camera.return_value + cam_instance.start_and_run_server.assert_called_once_with( + '1.2.3.4', + 1234, + 'streaming-url', + 'snapshot-url', + 1 + ) -@patch("spyglass.server.run_server") +@patch("spyglass.camera.csi.CSI.start_and_run_server") @patch("spyglass.camera.init_camera") @pytest.mark.parametrize("input_value, expected_output", [ ('h', 1), @@ -405,7 +215,7 @@ def test_run_server_with_configuration_from_arguments(mock_spyglass_server, mock ('mhr90', 7), ('r270', 8), ]) -def test_run_server_with_orientation(mock_spyglass_server, mock_spyglass_camera, input_value, expected_output): +def test_run_server_with_orientation(mock_init_camera, mock_run_server, input_value, expected_output): from spyglass import cli import spyglass.server cli.main(args=[ @@ -415,60 +225,11 @@ def test_run_server_with_orientation(mock_spyglass_server, mock_spyglass_camera, '-sn', 'snapshot-url', '-or', input_value ]) - spyglass.server.run_server.assert_called_once_with( + cam_instance = mock_init_camera.return_value + cam_instance.start_and_run_server.assert_called_once_with( '1.2.3.4', 1234, - ANY, - ANY, 'streaming-url', 'snapshot-url', expected_output ) - - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_using_only_tuning_filter_file(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-tf', 'test', - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - "test", - DEFAULT_TUNING_FILTER_DIR - ) - -@patch("spyglass.server.run_server") -@patch("spyglass.camera.init_camera") -def test_init_camera_using_tuning_filters(mock_spyglass_server, mock_spyglass_camera): - from spyglass import cli - import spyglass.camera - cli.main(args=[ - '-tf', 'test', - '-tfd', 'test-dir', - ]) - spyglass.camera.init_camera.assert_called_once_with( - DEFAULT_WIDTH, - DEFAULT_HEIGHT, - DEFAULT_FPS, - DEFAULT_AUTOFOCUS_MODE, - DEFAULT_LENS_POSITION, - DEFAULT_AF_SPEED, - DEFAULT_UPSIDE_DOWN, - DEFAULT_FLIP_HORIZONTALLY, - DEFAULT_FLIP_VERTICALLY, - DEFAULT_CONTROLS, - "test", - "test-dir" - )