1+ """
2+ Scripts for MP4 files's support
3+
4+ """
5+ import threading
6+ from collections import OrderedDict
7+
8+ import openslide
9+ from openslide import OpenSlideError
10+ import numpy as np
11+ import cv2
12+ from PIL import Image
13+ try :
14+ from util .enums import FrameType
15+ except ImportError :
16+ from enums import FrameType
17+
18+
19+ class ReadableMP4Dataset (openslide .ImageSlide ):
20+ def __init__ (self , filename , cache_size = 32 , max_cache_bytes = None ):
21+ self .slide_path = filename
22+
23+ self ._cap = None
24+ self ._cap_lock = threading .RLock ()
25+ self ._frame_cache = OrderedDict ()
26+ self ._cache_size = cache_size
27+
28+ self ._max_cache_bytes = max_cache_bytes # optional based on memory
29+ self ._cache_bytes = 0
30+
31+ cap = cv2 .VideoCapture (filename )
32+ if not cap .isOpened ():
33+ raise OpenSlideError (f"Could not open video file: { filename } " )
34+ # Get video properties
35+ self ._width = int (cap .get (cv2 .CAP_PROP_FRAME_WIDTH ))
36+ self ._height = int (cap .get (cv2 .CAP_PROP_FRAME_HEIGHT ))
37+ self .numberOfLayers = int (cap .get (cv2 .CAP_PROP_FRAME_COUNT )) # total number of frames
38+ self .fps = cap .get (cv2 .CAP_PROP_FPS )
39+
40+ cap .release ()
41+
42+ self ._dimensions = (self ._width , self ._height )
43+
44+ def __reduce__ (self ):
45+ return (self .__class__ , (self .slide_path ,))
46+
47+ def close (self ):
48+ with self ._cap_lock :
49+ if self ._cap is not None :
50+ self ._cap .release ()
51+ self ._cap = None
52+
53+ self ._frame_cache .clear ()
54+ self ._cache_bytes = 0
55+
56+ def __del__ (self ):
57+ self .close ()
58+
59+ def __enter__ (self ):
60+ return self
61+
62+ def __exit__ (self , * args ):
63+ self .close ()
64+
65+ @property
66+ def properties (self ):
67+ return {
68+ openslide .PROPERTY_NAME_BACKGROUND_COLOR : '000000' ,
69+ openslide .PROPERTY_NAME_MPP_X : 0 ,
70+ openslide .PROPERTY_NAME_MPP_Y : 0 ,
71+ openslide .PROPERTY_NAME_OBJECTIVE_POWER : 1 ,
72+ openslide .PROPERTY_NAME_VENDOR : 'MP4'
73+ }
74+
75+
76+ @property
77+ def dimensions (self ):
78+ return self ._dimensions
79+
80+ @property
81+ def frame_descriptors (self ) -> list [str ]:
82+ """ returns a list of strings, used as descriptor for each frame
83+ """
84+ return ['%.2f' % (x / self .fps ) for x in range (self .nFrames )]
85+
86+ @property
87+ def frame_type (self ):
88+ return FrameType .TIMESERIES
89+
90+ @property
91+ def default_frame (self ) -> list [str ]:
92+ return 0
93+
94+ @property
95+ def nFrames (self ):
96+ return self .numberOfLayers
97+
98+ @property
99+ def level_dimensions (self ):
100+ return (self .dimensions ,)
101+
102+ @property
103+ def level_count (self ):
104+ return 1
105+
106+ def _get_capture (self ):
107+ if self ._cap is None or not self ._cap .isOpened ():
108+ self ._cap = cv2 .VideoCapture (self .slide_path )
109+ return self ._cap
110+
111+ def _frame_num_bytes (self , frame_arr : np .ndarray ) -> int :
112+ """
113+
114+ Calculate the number of bytes used by a frame array.
115+ :param frame_arr: Description
116+ """
117+ try :
118+ return int (frame_arr .nbytes )
119+ except Exception :
120+ return 0
121+
122+ def _evict_if_needed (self ):
123+ """
124+ Evict frames by LRU
125+ """
126+
127+ # Evict frames if exceeding max frame count
128+ while self ._cache_size is not None and len (self ._frame_cache ) > self ._cache_size :
129+ old_idx , old_frame = self ._frame_cache .popitem (last = False )
130+ self ._cache_bytes -= self ._frame_num_bytes (old_frame )
131+ # Evict based on byte size
132+ if self ._max_cache_bytes is not None :
133+ while self ._cache_bytes > self ._max_cache_bytes and len (self ._frame_cache ) > 0 :
134+ old_idx , old_frame = self ._frame_cache .popitem (last = False )
135+ self ._cache_bytes -= self ._frame_num_bytes (old_frame )
136+
137+ def _read_frame (self , frame_idx : int ):
138+ """
139+ Before reading the frame, check the cache first with thread safety.
140+ Followed by LRU cache eviction policy.
141+
142+ :param frame_idx:
143+ """
144+ with self ._cap_lock :
145+ cached = self ._frame_cache .get (frame_idx )
146+ if cached is not None :
147+ self ._frame_cache .move_to_end (frame_idx )
148+ return cached
149+
150+ cap = self ._get_capture ()
151+ cap .set (cv2 .CAP_PROP_POS_FRAMES , frame_idx )
152+ success , img = cap .read ()
153+ if not success :
154+ return None
155+ # Convert BGR to RGBA
156+ img_rgb = cv2 .cvtColor (img , cv2 .COLOR_BGR2RGBA )
157+ self ._frame_cache [frame_idx ] = img_rgb
158+ self ._frame_cache .move_to_end (frame_idx , last = True )
159+ self ._cache_bytes += self ._frame_num_bytes (img_rgb )
160+
161+ self ._evict_if_needed ()
162+ return img_rgb
163+
164+
165+ def get_thumbnail (self , size ):
166+ return self .read_region ((0 ,0 ),0 , self .dimensions ).resize (size )
167+
168+ def read_region (self , location , level , size , frame = 0 ):
169+
170+ """
171+ Reads a region from a specific video frame.
172+ Return a PIL.Image containing the contents of the region.
173+ Reference: https://github.com/DeepMicroscopy/Exact/commit/4d52b614fa41328bf08367d99e088c1e838fb05a
174+
175+
176+ location: (x, y) tuple giving the top left pixel in the level 0
177+ reference frame.
178+ level: the level number.
179+ size: (width, height) tuple giving the region size.
180+ frame: the frame index to read from the video.
181+
182+ """
183+ if level != 0 :
184+ raise OpenSlideError ("Only level 0 is supported for video files." )
185+
186+ if any (s < 0 for s in size ):
187+ raise OpenSlideError (f"Size { size } must be non-negative" )
188+
189+ # Clamp frame index
190+ frame = max (0 , min (frame , self .numberOfLayers - 1 ))
191+ img_rgb = self ._read_frame (frame )
192+ if img_rgb is None :
193+ # Return a transparent tile if frame read fails
194+ return Image .new ("RGBA" , size , (0 , 0 , 0 , 0 ))
195+
196+ x , y = location
197+ w , h = size
198+
199+ # Create the transparent canvas
200+ tile = Image .new ("RGBA" , size , (0 , 0 , 0 , 0 ))
201+
202+ # Calculate crop boundaries within the source image
203+ img_h , img_w = img_rgb .shape [:2 ]
204+
205+ # Source coordinates
206+ src_x1 = max (0 , min (x , img_w ))
207+ src_y1 = max (0 , min (y , img_h ))
208+ src_x2 = max (0 , min (x + w , img_w ))
209+ src_y2 = max (0 , min (y + h , img_h ))
210+
211+ # Destination coordinates (where to paste on the tile)
212+ dst_x1 = max (0 , - x ) if x < 0 else 0
213+ dst_y1 = max (0 , - y ) if y < 0 else 0
214+
215+ # Extract the crop using numpy slicing
216+ crop_data = img_rgb [src_y1 :src_y2 , src_x1 :src_x2 ]
217+
218+ if crop_data .size > 0 :
219+ crop_img = Image .fromarray (crop_data )
220+ tile .paste (crop_img , (dst_x1 , dst_y1 ))
221+
222+ return tile
223+
224+ def get_duration (self ):
225+ """Get the length of the video in seconds."""
226+ return self .numberOfLayers / self .fps if self .fps > 0 else 0
227+
228+ def time_to_frame (self , time_seconds : float ) -> int :
229+ """Convert time in seconds to frame index."""
230+ frame_idx = int (time_seconds * self .fps )
231+ return max (0 , min (frame_idx , self .numberOfFrames - 1 ))
232+
233+ def frame_to_time (self , frame_idx : int ) -> float :
234+ """Convert frame index to time in seconds."""
235+ return frame_idx / self .fps if self .fps > 0 else 0
0 commit comments