Skip to content

Commit d4e388d

Browse files
authored
Merge pull request #206 from Sahil-u07/fix/zmq-cleanup-on-exit
fix: add atexit and signal handlers for ZMQ cleanup
2 parents 36d25eb + 6355f7b commit d4e388d

1 file changed

Lines changed: 35 additions & 1 deletion

File tree

concore.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import re
88
import zmq
99
import numpy as np
10+
import signal
11+
1012
logging.basicConfig(
1113
level=logging.INFO,
1214
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
@@ -81,6 +83,7 @@ def recv_json_with_retry(self):
8183

8284
# Global ZeroMQ ports registry
8385
zmq_ports = {}
86+
_cleanup_in_progress = False
8487

8588
def init_zmq_port(port_name, port_type, address, socket_type_str):
8689
"""
@@ -107,14 +110,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
107110
logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
108111

109112
def terminate_zmq():
110-
for port in zmq_ports.values():
113+
"""Clean up all ZMQ sockets and contexts before exit."""
114+
global _cleanup_in_progress
115+
116+
if _cleanup_in_progress:
117+
return # Already cleaning up, prevent reentrant calls
118+
119+
if not zmq_ports:
120+
return # No ports to clean up
121+
122+
_cleanup_in_progress = True
123+
print("\nCleaning up ZMQ resources...")
124+
for port_name, port in zmq_ports.items():
111125
try:
112126
port.socket.close()
113127
port.context.term()
128+
print(f"Closed ZMQ port: {port_name}")
114129
except Exception as e:
115130
logging.error(f"Error while terminating ZMQ port {port.address}: {e}")
131+
zmq_ports.clear()
132+
_cleanup_in_progress = False
116133

134+
def signal_handler(sig, frame):
135+
"""Handle interrupt signals gracefully."""
136+
print(f"\nReceived signal {sig}, shutting down gracefully...")
137+
# Prevent terminate_zmq from being called twice: once here and once via atexit
138+
try:
139+
atexit.unregister(terminate_zmq)
140+
except Exception:
141+
# If unregister fails for any reason, proceed with explicit cleanup anyway
142+
pass
143+
terminate_zmq()
144+
sys.exit(0)
145+
146+
# Register cleanup handlers
117147
atexit.register(terminate_zmq)
148+
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
149+
if not hasattr(sys, 'getwindowsversion'):
150+
signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only)
151+
118152
# --- ZeroMQ Integration End ---
119153

120154

0 commit comments

Comments
 (0)