Skip to content

Commit 17c1665

Browse files
authored
Fix mypy errors in src/frheed/cameras (#25)
1 parent 79e3f2c commit 17c1665

File tree

3 files changed

+72
-99
lines changed

3 files changed

+72
-99
lines changed

pyproject.toml

+12-6
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,27 @@ include = [
5858
packages = ["src/frheed"]
5959

6060
[tool.ruff]
61-
# Increase the line length from the default of 88
6261
line-length = 100
63-
64-
# Always autofix when possible
6562
fix = true
6663

6764
[tool.ruff.lint]
68-
# Specify rules explicitly to avoid configuration drift due to defaults changing
6965
select = [
7066
"E", # all pyflakes rules
7167
"F", # all pycodestyle rules
7268
"I", # all isort rules
7369
]
74-
75-
# Rules to exclude from autofix
7670
unfixable = [
7771
"F401", # unused imports
7872
]
73+
74+
[tool.mypy]
75+
disallow_untyped_defs = true
76+
warn_return_any = true
77+
78+
# Third-party libraries without stubs
79+
[[tool.mypy.overrides]]
80+
module = [
81+
"PySpin",
82+
]
83+
follow_imports = "silent"
84+
ignore_missing_imports = true

src/frheed/cameras/flir/__init__.py

+33-48
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
Adapted from simple_pyspin: https://github.com/klecknerlab/simple_pyspin
44
"""
55

6+
from __future__ import annotations
7+
8+
import collections
9+
import logging
610
import time
7-
from collections import deque
8-
from typing import Tuple, Union
11+
from types import TracebackType
12+
from typing import Any
913

1014
import numpy as np
1115
import PySpin
@@ -63,9 +67,7 @@
6367
"SerialReceiveQueueClear": "Clear Serial Port",
6468
}
6569

66-
_DEBUG = __name__ == "__main__"
67-
68-
_SYSTEM = None
70+
_SYSTEM: PySpin.System | None = None
6971

7072

7173
def list_cameras() -> PySpin.CameraList:
@@ -178,7 +180,7 @@ class FlirCamera:
178180
PySpin.intfICommand: "command",
179181
}
180182

181-
def __init__(self, src: Union[int, str] = 0, lock: bool = False):
183+
def __init__(self, src: int | str = 0, lock: bool = False):
182184
"""
183185
Parameters
184186
----------
@@ -196,9 +198,7 @@ def __init__(self, src: Union[int, str] = 0, lock: bool = False):
196198
super().__setattr__("lock", lock)
197199

198200
cam_list = list_cameras()
199-
200-
if _DEBUG:
201-
print(f"Found {cam_list.GetSize()} FLIR camera(s)")
201+
logging.debug("Found %s FLIR camera(s)", cam_list.GetSize())
202202

203203
self._src_type = type(src)
204204
self._src = src
@@ -216,10 +216,10 @@ def __init__(self, src: Union[int, str] = 0, lock: bool = False):
216216

217217
# Other attributes which may be accessed later
218218
self._running = False
219-
self._frame_times = deque()
219+
self._frame_times: collections.deque[float] = collections.deque()
220220
self._incomplete_image_count = 0
221221

222-
def __getattr__(self, attr: str) -> object:
222+
def __getattr__(self, attr: str) -> Any:
223223
# Add this in so @property decorator works as expected
224224
if attr in self.__dict__:
225225
return self.__dict__[attr]
@@ -265,11 +265,16 @@ def __setattr__(self, attr: str, val: object) -> None:
265265
else:
266266
super().__setattr__(attr, val)
267267

268-
def __enter__(self) -> "FlirCamera":
268+
def __enter__(self) -> FlirCamera:
269269
self.init()
270270
return self
271271

272-
def __exit__(self, type, value, traceback) -> None:
272+
def __exit__(
273+
self,
274+
exc_type: type[BaseException] | None,
275+
exc: BaseException | None,
276+
traceback: TracebackType | None,
277+
) -> None:
273278
self.close()
274279

275280
def __del__(self) -> None:
@@ -388,7 +393,7 @@ def height(self) -> int:
388393
return int(height)
389394

390395
@property
391-
def shape(self) -> Tuple[int, int]:
396+
def shape(self) -> tuple[int, int]:
392397
"""Get the camera array dimensions (Height x Width)"""
393398
if not self.initialized:
394399
self.init()
@@ -405,10 +410,11 @@ def init(self) -> None:
405410
Initializes the camera. Automatically called if the camera is opened
406411
using a 'with' clause.
407412
"""
408-
409413
self.cam.Init()
410414

411-
for node in self.cam.GetNodeMap().GetNodes():
415+
node_map: PySpin.INodeMap = self.cam.GetNodeMap()
416+
nodes: list[PySpin.INode] = node_map.GetNodes()
417+
for node in nodes:
412418
pit = node.GetPrincipalInterfaceType()
413419
name = node.GetName()
414420
self.camera_node_types[name] = self._attr_type_names.get(pit, pit)
@@ -456,7 +462,7 @@ def stop(self) -> None:
456462

457463
if self.running:
458464
self.cam.EndAcquisition()
459-
self._frame_times = deque()
465+
self._frame_times.clear()
460466
self._incomplete_image_count = 0
461467
self._running = False
462468

@@ -474,11 +480,10 @@ def close(self) -> None:
474480
pass
475481

476482
# Reset attributes
477-
self.camera_attributes = {}
478-
self.camera_methods = {}
479-
self.camera_node_types = {}
483+
self.camera_attributes: dict[str, Any] = {}
484+
self.camera_methods: dict[str, PySpin.CCommandPtr] = {}
485+
self.camera_node_types: dict[str, str] = {}
480486
self._initialized = False
481-
# self.system.ReleaseInstance()
482487

483488
def get_image(self, wait: bool = True) -> PySpin.ImagePtr:
484489
"""
@@ -513,7 +518,7 @@ def get_image(self, wait: bool = True) -> PySpin.ImagePtr:
513518

514519
def get_array(
515520
self, wait: bool = True, get_chunk: bool = False, complete_frames_only: bool = False
516-
) -> Union[np.ndarray, Tuple[np.ndarray, PySpin.PySpin.ChunkData]]:
521+
) -> np.ndarray | tuple[np.ndarray, PySpin.PySpin.ChunkData]:
517522
"""
518523
Get an image from the camera, and convert it to a numpy array.
519524
@@ -547,12 +552,14 @@ def get_array(
547552
if len(self._frame_times) > 3600:
548553
self._frame_times.popleft()
549554

555+
arr: np.ndarray = img.GetNDArray()
550556
if get_chunk:
551-
return img.GetNDArray(), img.GetChunkData()
557+
chunk: PySpin.PySpin.ChunkData = img.GetChunkData()
558+
return (arr, chunk)
552559
else:
553-
return img.GetNDArray()
560+
return arr
554561

555-
def get_info(self, name: str) -> dict:
562+
def get_info(self, name: str) -> dict[str, Any]:
556563
"""
557564
Get information on a camera node (attribute or method).
558565
@@ -570,7 +577,7 @@ def get_info(self, name: str) -> dict:
570577
- "unit": the unit of the value (as a string).
571578
- "min" and "max": the min/max value.
572579
"""
573-
info = {"name": name}
580+
info: dict[str, Any] = {"name": name}
574581

575582
if name in self.camera_attributes:
576583
node = self.camera_attributes[name]
@@ -716,25 +723,3 @@ def document(self, verbose: bool = True) -> str:
716723
lines.append("")
717724

718725
return "\n".join(lines)
719-
720-
721-
if __name__ == "__main__":
722-
723-
def test():
724-
with FlirCamera() as cam:
725-
print(cam.document(verbose=False))
726-
print(cam)
727-
cam.start()
728-
while True:
729-
try:
730-
global image
731-
image = cam.get_array()
732-
# print(cam.incomplete_image_count, cam.real_fps)
733-
print(cam.DeviceTemperature)
734-
735-
except KeyboardInterrupt:
736-
break
737-
738-
# test()
739-
740-
print(list_cameras())

src/frheed/cameras/usb/__init__.py

+27-45
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22
Connecting to USB cameras.
33
"""
44

5+
from __future__ import annotations
6+
7+
import collections
8+
import logging
59
import os
610
import time
7-
from collections import deque
8-
from typing import List, Optional, Tuple, Union
11+
from types import TracebackType
12+
from typing import Any
913

1014
import cv2
1115
import numpy as np
@@ -16,8 +20,6 @@
1620
# https://stackoverflow.com/a/54585850/10342097
1721
os.environ["OPENCV_VIDEOIO_PRIORITY_MSMF"] = "0"
1822

19-
_DEBUG = __name__ == "__main__"
20-
2123
# Get non-platform-specific capture properties (0 < id < 50)
2224
_CAP_PROPS = [
2325
prop for prop in dir(cv2) if prop.startswith("CAP_PROP") and getattr(cv2, prop, 1000) < 50
@@ -43,7 +45,7 @@
4345
_DEFAULT_BACKEND = cv2.CAP_DSHOW # cv2.CAP_DSHOW or cv2.CAP_MSMF
4446

4547

46-
def list_cameras() -> List[int]:
48+
def list_cameras() -> list[int]:
4749
"""
4850
Get list of indices of available USB cameras.
4951
@@ -129,11 +131,8 @@ class UsbCamera:
129131
}
130132

131133
def __init__(
132-
self,
133-
src: Union[int, str] = 0,
134-
lock: bool = False,
135-
backend: Optional[int] = _DEFAULT_BACKEND, # cv2.CAP_DSHOW
136-
):
134+
self, src: int | str = 0, lock: bool = False, backend: int | None = _DEFAULT_BACKEND
135+
) -> None:
137136
"""
138137
Parameters
139138
----------
@@ -149,10 +148,12 @@ def __init__(
149148
super().__setattr__("camera_attributes", {})
150149
super().__setattr__("camera_methods", {})
151150
super().__setattr__("lock", lock)
151+
self.camera_attributes: dict[str, Any]
152+
self.camera_methods: dict[str, Any]
153+
self.lock: bool
152154

153155
cam_list = list_cameras()
154-
if _DEBUG:
155-
print(f"Found {len(cam_list)} USB camera(s)")
156+
logging.debug("Found %s USB camera(s)", len(cam_list))
156157

157158
if not cam_list:
158159
raise CameraError("No USB cameras detected.")
@@ -174,10 +175,10 @@ def __init__(
174175

175176
# Other attributes which may be accessed later
176177
self._running = True # camera is running as soon as you connect to it
177-
self._frame_times = deque()
178+
self._frame_times: collections.deque[float] = collections.deque()
178179
self._incomplete_image_count = 0
179180

180-
def __getattr__(self, attr: str) -> object:
181+
def __getattr__(self, attr: str) -> Any:
181182
# Add this in so @property decorator works as expected
182183
if attr in self.__dict__:
183184
return self.__dict__[attr]
@@ -191,7 +192,7 @@ def __getattr__(self, attr: str) -> object:
191192
else:
192193
raise AttributeError(attr)
193194

194-
def __setattr__(self, attr: str, val: object) -> None:
195+
def __setattr__(self, attr: str, val: Any) -> None:
195196
if attr in self.camera_attributes:
196197
propId = getattr(cv2, attr, None)
197198
if propId is None:
@@ -205,8 +206,7 @@ def __setattr__(self, attr: str, val: object) -> None:
205206

206207
success = self.cam.set(propId, val)
207208
result = "succeeded" if success else "failed"
208-
if _DEBUG or not success:
209-
print(f"Setting {attr} to {val} {result}")
209+
logging.debug("Setting %s to %s %s", attr, val, result)
210210

211211
else:
212212
if attr == "__class__":
@@ -216,11 +216,16 @@ def __setattr__(self, attr: str, val: object) -> None:
216216
else:
217217
super().__setattr__(attr, val)
218218

219-
def __enter__(self) -> "UsbCamera":
219+
def __enter__(self) -> UsbCamera:
220220
self.init()
221221
return self
222222

223-
def __exit__(self, type, value, traceback) -> None:
223+
def __exit__(
224+
self,
225+
exc_type: type[BaseException] | None,
226+
exc: BaseException | None,
227+
traceback: TracebackType | None,
228+
) -> None:
224229
self.close()
225230

226231
def __del__(self) -> None:
@@ -275,10 +280,10 @@ def height(self) -> int:
275280
return int(self.CAP_PROP_FRAME_HEIGHT)
276281

277282
@property
278-
def shape(self) -> Tuple[int, int]:
283+
def shape(self) -> tuple[int, int]:
279284
return (self.width, self.height)
280285

281-
def init(self):
286+
def init(self) -> None:
282287
if not self.initialized:
283288
self.cam.open(self._src)
284289

@@ -291,7 +296,7 @@ def start(self, continuous: bool = True) -> None:
291296
self._running = True
292297

293298
def stop(self) -> None:
294-
self._frame_times = []
299+
self._frame_times.clear()
295300
self._incomplete_image_count = 0
296301
self._running = False
297302

@@ -337,26 +342,3 @@ def get_info(self, name: str) -> dict:
337342
"""Get information about a camera node (attribute or method)."""
338343
# TODO: Fully implement
339344
return {"name": name}
340-
341-
342-
if __name__ == "__main__":
343-
344-
def test():
345-
from PIL import Image
346-
347-
with UsbCamera() as cam:
348-
while True:
349-
try:
350-
for prop in cam._cap_props:
351-
print(prop, (getattr(cam, prop)))
352-
353-
break
354-
355-
array = cam.get_array()
356-
Image.fromarray(array).show()
357-
break
358-
359-
except KeyboardInterrupt:
360-
break
361-
362-
test()

0 commit comments

Comments
 (0)