-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreceiver_utils.py
More file actions
427 lines (376 loc) · 13.2 KB
/
receiver_utils.py
File metadata and controls
427 lines (376 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
import argparse
from common_utils import (
apply_required_external_defaults,
get_logger,
MULTICAST_IP,
CAMERA_FRAME_WIDTH,
CAMERA_FRAME_HEIGHT,
)
import threading
from typing import List, Optional
import numpy as np
import time
from PyQt6.QtWidgets import QMainWindow, QLabel, QApplication, QWidget, QVBoxLayout
from PyQt6.QtGui import QPixmap, QImage
from PyQt6.QtCore import Qt, pyqtSlot, QCoreApplication, QObject, QEvent
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
class QuitFilter(QObject):
def __init__(self, receiver):
super().__init__()
self.receiver = receiver
def eventFilter(self, obj, event):
if event.type() == QEvent.Type.KeyPress:
try:
if event.key() == Qt.Key.Key_Q:
self.receiver.stop()
QCoreApplication.quit()
return True
except Exception:
pass
return False
logger = get_logger(__name__)
# Initialize GStreamer
Gst.init(None)
GSTREAMER_CONNECTION_POLL_INTERVAL_SECONDS = 0.1
def handle_arguments():
parser = argparse.ArgumentParser(
prog="camera_receiver",
description="Receive one or more camera streams using GStreamer Multicast",
)
parser.add_argument("--broadcast-ip", type=str, help="Legacy IP argument (ignored for multicast)")
parser.add_argument(
"--base-port",
type=int,
help="Starting port for the first camera. Subsequent cameras use base-port+index",
)
parser.add_argument(
"--count",
type=int,
help="Number of sequential ports to subscribe to starting at base-port",
)
parser.add_argument(
"--timeout",
type=float,
help="Total setup timeout in seconds (discovery + initial connection)",
)
parser.add_argument(
"--auto-config",
type=str,
choices=["on", "off"],
help="Auto-configure from discovery announcements",
)
parser.add_argument(
"--streamer-name-filter",
type=str,
help="Only accept discovery packets from this streamer name",
)
parser.add_argument(
"--discovery-port", type=int, help="UDP port used for discovery announcements"
)
parser.add_argument(
"--discovery-timeout",
type=float,
help="Optional override for discovery phase timeout in seconds",
)
try:
apply_required_external_defaults(parser, "receiver-only")
except RuntimeError as exc:
logger.error(f"[defaults] {exc}")
exit(2)
return parser.parse_args()
class FrameStore:
def __init__(self):
self._frames: dict = {}
self._lock = threading.Lock()
def set_latest(self, stream_name: str, frame: np.ndarray) -> Optional[Exception]:
try:
with self._lock:
self._frames[stream_name] = frame.copy()
return None
except (TypeError, RuntimeError) as exc:
return exc
def remove_stream(self, stream_name: str):
try:
with self._lock:
self._frames.pop(stream_name, None)
except Exception:
pass
def snapshot_keys(self) -> List[str]:
with self._lock:
return list(self._frames.keys())
def get_frame(self, stream_name: str):
with self._lock:
return self._frames.get(stream_name)
class SingleReceiver:
def __init__(self, port: int, timeout: float, stop_event: threading.Event, frame_store: FrameStore, window_prefix: str):
self.port = port
self.timeout = timeout
self.stop_event = stop_event
self.frame_store = frame_store
self.window_prefix = window_prefix
self.pipeline = None
self.appsink = None
self._first_frame_event = threading.Event()
self._appsink_handler_id = None
self._stop_requested = False
def start(self):
pipeline_str = (
f"udpsrc multicast-group={MULTICAST_IP} port={self.port} auto-multicast=true ! "
"application/x-rtp,media=video,clock-rate=90000,payload=96,encoding-name=H264 ! "
"rtpjitterbuffer latency=0 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! video/x-raw,format=BGR ! appsink name=appsink emit-signals=true max-buffers=1 drop=true sync=false"
)
window_name = f"{self.window_prefix}-{self.port}"
logger.info(f"[{window_name}] Attempting to connect to multicast {MULTICAST_IP}:{self.port}...")
logger.info(f"[{window_name}] Pipeline: {pipeline_str}")
try:
self.pipeline = Gst.parse_launch(pipeline_str)
self.appsink = self.pipeline.get_by_name("appsink")
if self.appsink is None:
logger.error(f"[{window_name}] Failed to find appsink element in pipeline")
return
# connect appsink new-sample callback
try:
self._appsink_handler_id = self.appsink.connect("new-sample", self._on_new_sample)
except Exception:
# fallback - some bindings may require using 'connect' via GObject
try:
self._appsink_handler_id = self.appsink.connect("new-sample", self._on_new_sample)
except Exception as e:
logger.error(f"[{window_name}] Failed to connect appsink handler: {e}")
self._appsink_handler_id = None
# Set to PLAYING state
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
logger.error(f"[{window_name}] Failed to set pipeline to PLAYING state")
return
except Exception as e:
logger.error(f"[{window_name}] Failed to create GStreamer pipeline: {e}")
return
connection_timeout = self.timeout
# Wait for first frame (set by callback)
if not self._first_frame_event.wait(timeout=connection_timeout):
logger.error(f"Failed to connect to {MULTICAST_IP}:{self.port} (timeout after {connection_timeout}s)")
# ensure cleanup
self.stop()
return
logger.info(f"Connected to {MULTICAST_IP}:{self.port} -> window '{window_name}'")
# Keep thread alive until stop is requested
try:
while not self.stop_event.is_set():
time.sleep(GSTREAMER_CONNECTION_POLL_INTERVAL_SECONDS)
finally:
self.frame_store.remove_stream(window_name)
if self.pipeline is not None:
try:
self.pipeline.set_state(Gst.State.NULL)
except Exception:
pass
def _process_sample(self, sample, window_name: str, current_frame_store_failures: int) -> int:
"""Process a GStreamer sample and store the frame. Returns updated failure count."""
try:
buffer = sample.get_buffer()
caps = sample.get_caps()
# Extract frame data
success, mapinfo = buffer.map(Gst.MapFlags.READ)
if not success:
return current_frame_store_failures
try:
# Get frame dimensions from caps
structure = caps.get_structure(0)
width = structure.get_value("width")
height = structure.get_value("height")
# Create numpy array from buffer data
frame_data = np.frombuffer(mapinfo.data, dtype=np.uint8)
frame = frame_data.reshape((height, width, 3))
# Store frame
frame_store_error = self.frame_store.set_latest(window_name, frame)
if frame_store_error is not None:
current_frame_store_failures += 1
logger.error(f"[{window_name}] frame store failed (failure #{current_frame_store_failures}): {frame_store_error}")
return current_frame_store_failures
finally:
buffer.unmap(mapinfo)
except Exception as e:
logger.error(f"[{window_name}] Error processing sample: {e}")
return current_frame_store_failures
def _on_new_sample(self, appsink):
"""GStreamer appsink 'new-sample' callback."""
try:
sample = appsink.emit("pull-sample")
if sample is None:
return Gst.FlowReturn.OK
window_name = f"{self.window_prefix}-{self.port}"
# Reuse _process_sample logic but adapt to callback semantics
# Note: _process_sample expects (sample, window_name, failure_count)
self._process_sample(sample, window_name, 0)
# mark first frame received
if not self._first_frame_event.is_set():
self._first_frame_event.set()
return Gst.FlowReturn.OK
except Exception as e:
logger.error(f"[{self.window_prefix}-{self.port}] Error in new-sample callback: {e}")
return Gst.FlowReturn.OK
def stop(self):
"""Stop the receiver: disconnect callbacks and set pipeline to NULL."""
self._stop_requested = True
try:
if self.appsink is not None and self._appsink_handler_id is not None:
try:
self.appsink.disconnect(self._appsink_handler_id)
except Exception:
pass
self._appsink_handler_id = None
if self.pipeline is not None:
try:
self.pipeline.set_state(Gst.State.NULL)
except Exception:
pass
except Exception:
pass
class StreamDisplayWidget(QMainWindow):
"""Controller that creates one top-level window per stream (simple multi-window mode)."""
def __init__(self, receiver, grid_cols: int = 4):
super().__init__()
self.receiver = receiver
self.grid_cols = grid_cols
# Map stream_name -> (window, label, event_filter)
self.stream_windows: dict[str, tuple[QMainWindow, QLabel, QObject]] = {}
self.setWindowTitle("WRecorder - Stream Display")
# Keep a small controller window (can be minimized)
self.resize(320, 40)
def _compute_window_size(self) -> tuple[int, int]:
"""Compute a sensible default size for per-stream windows based on screen and camera size."""
screen = QApplication.primaryScreen()
if screen is None:
return CAMERA_FRAME_WIDTH, CAMERA_FRAME_HEIGHT
avail = screen.availableGeometry()
max_w = max(320, min(CAMERA_FRAME_WIDTH, avail.width() // 2))
max_h = max(240, min(CAMERA_FRAME_HEIGHT, avail.height() // 2))
return int(max_w), int(max_h)
@pyqtSlot()
def update_frames(self):
current_names = set(self.stream_windows.keys())
new_names_list = self.receiver.get_sorted_stream_names()
new_names = set(new_names_list)
# Create windows for newly discovered streams
for name in new_names - current_names:
win = QMainWindow()
win.setWindowTitle(name)
label = QLabel()
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
w, h = self._compute_window_size()
label.setFixedSize(w, h)
# Put label in a container with zero margins so the label area matches desired size
container = QWidget()
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.addWidget(label)
container.setLayout(layout)
win.setCentralWidget(container)
# Resize window to match content size (label). Avoid arbitrary extra offsets.
win.resize(w, h)
win.show()
# Install an event filter so pressing 'Q' in this window quits the app
try:
filter_obj = QuitFilter(self.receiver)
win.installEventFilter(filter_obj)
except Exception:
filter_obj = None
self.stream_windows[name] = (win, label, filter_obj)
# Remove windows for disconnected streams
for name in list(current_names - new_names):
win, label, filt = self.stream_windows.pop(name)
try:
if filt is not None:
try:
win.removeEventFilter(filt)
except Exception:
pass
win.close()
except Exception:
pass
# Update frames for existing windows in sorted order
for name in new_names_list:
pair = self.stream_windows.get(name)
if not pair:
continue
win, label = pair[0], pair[1]
frame = self.receiver.get_frame(name)
if frame is None:
# show waiting text
label.setText(f"Waiting for {name}...")
label.setPixmap(QPixmap())
continue
# Convert BGR -> RGB and to QImage
height, width = frame.shape[:2]
rgb_frame = frame[:, :, ::-1]
frame_bytes = rgb_frame.tobytes()
q_img = QImage(frame_bytes, width, height, 3 * width, QImage.Format.Format_RGB888)
pix = QPixmap.fromImage(q_img)
# Scale to label size keeping aspect ratio
lw = label.width()
lh = label.height()
pix = pix.scaled(lw, lh, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)
label.setPixmap(pix)
def keyPressEvent(self, event):
"""Handle key press events."""
if event.key() == Qt.Key.Key_Q:
logger.info("Quit key pressed. Stopping receivers...")
self.receiver.stop()
# Ensure Qt main loop exits
try:
QCoreApplication.quit()
except Exception:
self.close()
else:
super().keyPressEvent(event)
def closeEvent(self, event):
"""Handle window close event."""
logger.info("Window closed. Stopping receivers...")
self.receiver.stop()
event.accept()
class MultiReceiver:
def __init__(self, ports: List[int], timeout: float, window_prefix: str):
self.ports = ports
self.timeout = timeout
self.window_prefix = window_prefix
self.stop_event = threading.Event()
self.threads: List[threading.Thread] = []
self.frame_store = FrameStore()
self.sub_receivers: List[SingleReceiver] = []
def start(self):
for port in self.ports:
sub_receiver = SingleReceiver(port, self.timeout, self.stop_event, self.frame_store, self.window_prefix)
t = threading.Thread(target=sub_receiver.start)
t.start()
self.threads.append(t)
self.sub_receivers.append(sub_receiver)
def stop(self):
# Signal receivers to stop, call each sub.stop(), and ensure GStreamer pipelines are set to NULL
self.stop_event.set()
for sub in self.sub_receivers:
try:
sub.stop()
except Exception:
pass
# Join threads with timeout
for t in self.threads:
t.join(timeout=5.0)
def get_frame(self, stream_name: str):
return self.frame_store.get_frame(stream_name)
def get_stream_names(self) -> List[str]:
return self.frame_store.snapshot_keys()
def _extract_port_from_stream_name(self, stream_name: str) -> int:
"""Extract port number from stream name (format: prefix-port)."""
try:
port_str = stream_name.split('-')[-1]
return int(port_str)
except (ValueError, IndexError):
return 0
def get_sorted_stream_names(self) -> List[str]:
"""Get stream names sorted by port number."""
names = self.get_stream_names()
return sorted(names, key=self._extract_port_from_stream_name)