Source code for optibeam.camera

from .utils import *
from abc import ABC, abstractmethod
import numpy as np
import cv2
from pypylon import pylon
from datetime import datetime


[docs] class Camera(ABC): """ Abstract base class for a camera, providing a blueprint for camera operations. """
[docs] @abstractmethod def get_info(self) -> dict: """ Retrieves information about the camera. Returns: dict: A dictionary containing camera details such as model, resolution, and other relevant parameters. """ pass
[docs] @abstractmethod def set_camera_params(self, params: dict): """ Resets the camera parameters based on the input dictionary. Parameters: params (dict): A dictionary containing camera parameter settings such as exposure, ISO, etc. """ pass
[docs] @abstractmethod def ptp_status(self) -> bool: """ Checks the status of the Precision Time Protocol (PTP) on the camera. Returns: bool: True if PTP is enabled, False otherwise. """ pass
[docs] @abstractmethod def enable_ptp(self) -> None: """ Enables the Precision Time Protocol (PTP) on the camera. (if supported) """ pass
[docs] @abstractmethod def open(self) -> None: """ Opens the camera for capturing images. """ pass
[docs] @abstractmethod def close(self) -> None: """ Closes the camera after capturing images. release resources. """ pass
[docs] @abstractmethod def capture(self) -> np.ndarray: """ Captures an image using the camera. Returns: np.ndarray: An array representing the captured image. """ pass
[docs] class BaslerCamera(Camera): """ Class representing a Basler camera. https://docs.baslerweb.com/precision-time-protocol#checking-the-status-of-the-ptp-clock-synchronization """ def __init__(self, camera: pylon.InstantCamera, params: dict={}): """ Initializes a Basler camera object with a given camera ID. Parameters: camera_id (int): The ID of the camera. """ self.camera = camera self.open() self.set_camera_params(params)
[docs] def open(self): if self.camera is not None: try: self.camera.Close() except Exception: pass self.camera.Open()
[docs] def close(self): if self.camera.IsGrabbing(): self.camera.StopGrabbing() self.camera.Close()
[docs] def get_info(self) -> dict: """ Retrieves information about the Basler camera. Returns: dict: A dictionary containing camera details such as model, serial number, etc. """ info = self.camera.GetDeviceInfo() return { "Camera ID": self.camera_id, "Camera Information": info.GetModelName(), "Camera Serial Number": info.GetSerialNumber(), "Camera Device Version": info.GetDeviceVersion(), "Camera Device Class": info.GetDeviceClass(), "Camera Resolution": (self.camera.Width(), self.camera.Height()) }
[docs] def set_camera_params(self, params: dict): """ Resets the camera parameters based on the input dictionary. Parameters: params (dict): A dictionary containing camera parameter settings such as exposure, ISO, etc. """ self.converter = pylon.ImageFormatConverter() # Setting the converter to output mono8 images for simplicity self.converter.OutputPixelFormat = pylon.PixelType_Mono8 self.output_dim = [self.camera.Width.GetValue(), self.camera.Height.GetValue()] # Ensure the camera exposure, gain, and gamma are set to manual mode before adjusting self.camera.ExposureAuto.SetValue('Off') # Turn off auto exposure self.camera.GainAuto.SetValue('Off') # Turn off auto gain self.camera.GammaEnable.SetValue(True) # Enable gamma correction if supported # Adjust camera settings - these values are examples and should be adjusted based on your needs and camera capabilities self.camera.ExposureTimeRaw.SetValue(100000) # Set exposure time to 40000 microseconds self.camera.GainRaw.SetValue(100) # Set gain self.camera.Gamma.SetValue(1.0) # Set gamma value to 1.0 (if supported) self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) print(f"Resetting camera parameters: {params}")
[docs] def capture(self) -> Generator[np.ndarray, None, None]: while True: # Change this to a more reliable condition if necessary if not self.camera.IsGrabbing(): self.open_camera() self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) try: grabResult = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) if grabResult.GrabSucceeded(): # Convert to OpenCV format image = self.converter.Convert(grabResult) img = image.GetArray() yield img grabResult.Release() except Exception as e: print("Error encountered: ", e) # Optionally, attempt to reconnect or handle error img = cv2.putText(np.zeros((self.output_dim[1], self.output_dim[0]), np.uint8), "No Image Input", (100, 240), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 2) yield img
# No need to break here; let it attempt to reconnect in the next iteration
[docs] def demo_video(self): save_to = "../../ResultsCenter/images" cv2.namedWindow('Camera Output') cv2.createTrackbar('Exposure time (ms)', 'Camera Output', 50, 1000, lambda x: self.camera.ExposureTimeRaw.SetValue(x*1000)) # miniseconds for img in self.capture(): cv2.imshow('Camera Output', img) key = cv2.waitKey(1) if key == 27: # ESC key to exit break elif key == ord('s'): # 's' key to save the image timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"{save_to}/{timestamp}.png" cv2.imwrite(filename, img) print(f"Image saved as {filename}")
[docs] def ptp_status(self) -> bool: pass
[docs] def enable_ptp(self) -> None: pass
[docs] class Synchronizer: """ Class to handle synchronization and simultaneous image capturing from multiple camera objects using PTP. """ def __init__(self, cameras: List[Camera]): """ Initializes the Synchronizer with a list of Camera objects. Parameters: cameras (List[Camera]): A list of camera objects to be synchronized and managed. """ self.cameras = cameras self.initialize_ptp()
[docs] def camera_registration(self): pass
[docs] def initialize_ptp(self): """ Initializes PTP on all cameras to synchronize them. """ pass
[docs] def take_images(self): pass
# ------------------- other functionalities -------------------
[docs] def num_of_cameras_detected(): """ Returns the detected number of cameras that connected to the computer Returns: int: The number of cameras detected. """ # Get the transport layer factory tl_factory = pylon.TlFactory.GetInstance() # Get all attached devices devices = tl_factory.EnumerateDevices() if len(devices) == 0: print("No cameras detected.") else: print("Number of cameras detected:", len(devices)) # Print out the device info for each detected camera for i, device in enumerate(devices): print(f"Camera {i + 1}: {device.GetModelName()} - Serial Number: {device.GetSerialNumber()}") return len(devices)