Skip to content

Commit 7651aeb

Browse files
committed
[tests] Add FakeSubtractor to test event boundary merging
1 parent 4a70db4 commit 7651aeb

1 file changed

Lines changed: 93 additions & 2 deletions

File tree

tests/test_scan_context.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
import platform
1717
import typing as ty
1818

19+
import numpy as np
1920
import pytest
21+
from scenedetect import FrameTimecode
2022

2123
from dvr_scan.region import Point
22-
from dvr_scan.scanner import DetectorType, MotionScanner
23-
from dvr_scan.subtractor import SubtractorCNT, SubtractorCudaMOG2
24+
from dvr_scan.scanner import DetectorType, MotionEvent, MotionScanner
25+
from dvr_scan.subtractor import Subtractor, SubtractorCNT, SubtractorCudaMOG2
26+
from dvr_scan.video_joiner import VideoJoiner, AVAILABLE_BACKENDS
2427

2528
MACHINE_ARCH = platform.machine().upper()
2629

@@ -267,3 +270,91 @@ def test_merge_within_time_before(traffic_camera_video):
267270
compare_event_lists(
268271
event_list, TRAFFIC_CAMERA_EVENTS_MERGE_WITHIN_TIME_BEFORE, EVENT_FRAME_TOLERANCE
269272
)
273+
274+
275+
class FakeVideo(VideoJoiner):
276+
def __init__(self):
277+
self._position = FrameTimecode(0, fps=self.framerate)
278+
self._backend = AVAILABLE_BACKENDS["opencv"]
279+
pass
280+
281+
@property
282+
def paths(self):
283+
return ["fake_path.mp4"]
284+
285+
@property
286+
def resolution(self):
287+
return (1, 1)
288+
289+
@property
290+
def framerate(self) -> float:
291+
return 1.0
292+
293+
@property
294+
def total_frames(self) -> int:
295+
return 1000
296+
297+
@property
298+
def decode_failures(self) -> float:
299+
return 0
300+
301+
@property
302+
def position(self) -> FrameTimecode:
303+
return self._position + 1
304+
305+
@property
306+
def position_ms(self) -> float:
307+
return self._position.get_seconds() / 1000.0
308+
309+
def read(self, decode: bool = True) -> ty.Optional[np.ndarray]:
310+
if self._position.get_frames() >= self.total_frames:
311+
return None
312+
img = np.zeros((self.resolution[1], self.resolution[0], 3), dtype=np.uint8)
313+
self._position += 1
314+
return img
315+
316+
def seek(self, target: FrameTimecode):
317+
pass
318+
319+
320+
def test_fake_video():
321+
# With default subtractor it won't have any motion, it's just empty frames.
322+
scanner = MotionScanner(FakeVideo())
323+
assert scanner.scan().event_list == []
324+
325+
326+
# A fake subtractor we control to give a specific set of frame scores to test boundary and event
327+
# merging behaviors.
328+
class FakeSubtractor(Subtractor):
329+
def __init__(self, events: ty.List[MotionEvent]):
330+
self._frame_num = 0
331+
assert events
332+
self._events = events
333+
self._curr_event = 0
334+
335+
def apply(self, frame: np.ndarray) -> np.ndarray:
336+
self._frame_num += 1
337+
frame = np.copy(frame[:, :, 0])
338+
if self._curr_event >= len(self._events):
339+
return frame
340+
if self._frame_num > self._events[self._curr_event].end:
341+
self._curr_event += 1
342+
return frame
343+
if self._frame_num > self._events[self._curr_event].start:
344+
return np.add(frame, 254) # Scores of 255 are rejected by default.
345+
return frame
346+
347+
@staticmethod
348+
def is_available():
349+
return True
350+
351+
352+
def test_fake_subtractor():
353+
scanner = MotionScanner(FakeVideo())
354+
base_time = FrameTimecode(0, scanner._input.framerate)
355+
expected_events = [MotionEvent(start=(base_time + 100), end=(base_time + 999))]
356+
scanner._subtractor = FakeSubtractor(events=expected_events)
357+
# TODO(#72): This should be the same as the above list ideally, figure out why it's not.
358+
assert scanner.scan().event_list == [
359+
MotionEvent(start=(base_time + 99), end=(base_time + 1001))
360+
]

0 commit comments

Comments
 (0)