Skip to content

Commit 7944a00

Browse files
authored
Merge pull request #414 from Baltic-RCC/dev
Release v1.4.4
2 parents 22b5f29 + 2a781e1 commit 7944a00

File tree

5 files changed

+241
-23
lines changed

5 files changed

+241
-23
lines changed

config/cgm_worker/merger.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[MAIN]
2-
INPUT_RABBIT_QUE = emf-merge.tasks.rmm
3-
OUTPUT_RMQ_EXCHANGE = emf-quality.test
2+
CONSUMER_TYPE = SINGLE_MESSAGE
3+
INPUT_RABBIT_QUEUE = emf.merge.tasks.rmm
4+
OUTPUT_RMQ_EXCHANGE = emf.quality.models
45
INPUT_MINIO_BUCKET = opde-confidential-models
56
INPUT_MINIO_FOLDER = IGM
67
OUTPUT_MINIO_BUCKET = opde-confidential-models

config/model_validator/model_validator.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
[MAIN]
2+
CONSUMER_TYPE = SINGLE_MESSAGE
23
INPUT_RMQ_QUEUE = object-storage.models.validation
34
OUTPUT_RMQ_EXCHANGE = emf-quality.test
45
VALIDATION_ELK_INDEX = emfos-igm-validation

emf/common/integrations/rabbit.py

Lines changed: 187 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import logging
44
import pika
55
import config
6-
from typing import List
6+
import traceback
7+
import signal
8+
from typing import List, Optional
79
from emf.common.config_parser import parse_app_properties
810
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
911

@@ -158,6 +160,190 @@ def __del__(self):
158160
self.close()
159161

160162

163+
class SingleMessageConsumer:
164+
def __init__(self,
165+
host: str = RMQ_SERVER,
166+
port: int = int(RMQ_PORT),
167+
vhost: str = RMQ_VHOST,
168+
queue: str | None = None,
169+
username: str = RMQ_USERNAME,
170+
password: str = RMQ_PASSWORD,
171+
forward: Optional[str] = None,
172+
message_handlers: Optional[List[object]] = None,
173+
message_converter: Optional[object] = None,
174+
heartbeat: int = int(RMQ_HEARTBEAT_IN_SEC),
175+
socket_timeout: Optional[float] = None,
176+
blocked_connection_timeout: float = 600.0,
177+
connection_attempts: int = 5,
178+
retry_delay: int = 3,
179+
log_body: bool = False):
180+
self._host, self._port, self._vhost = host, int(port), vhost
181+
self._queue = queue
182+
self._username, self._password = username, password
183+
self.forward = forward
184+
self.message_handlers = message_handlers or []
185+
self.message_converter = message_converter
186+
self.log_body = log_body
187+
188+
self._heartbeat = heartbeat
189+
self._socket_timeout = socket_timeout
190+
self._blocked_connection_timeout = blocked_connection_timeout
191+
self._connection_attempts = connection_attempts
192+
self._retry_delay = retry_delay
193+
194+
self._connection: Optional[pika.BlockingConnection] = None
195+
self._channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
196+
self._in_shutdown = False
197+
198+
self._executor = ThreadPoolExecutor(max_workers=1)
199+
200+
signal.signal(signal.SIGTERM, self._on_term_signal)
201+
signal.signal(signal.SIGINT, self._on_term_signal)
202+
203+
def _params(self) -> pika.ConnectionParameters:
204+
return pika.ConnectionParameters(
205+
host=self._host,
206+
port=self._port,
207+
virtual_host=self._vhost,
208+
credentials=pika.PlainCredentials(self._username, self._password),
209+
heartbeat=self._heartbeat,
210+
blocked_connection_timeout=self._blocked_connection_timeout,
211+
connection_attempts=self._connection_attempts,
212+
retry_delay=self._retry_delay,
213+
socket_timeout=self._socket_timeout,
214+
client_properties={"connection_name": "keda-single-shot"},
215+
)
216+
217+
def connect(self):
218+
logger.info(f"Connecting to RabbitMQ at {self._host}:{self._port} vhost='{self._vhost}'")
219+
self._connection = pika.BlockingConnection(self._params())
220+
self._channel = self._connection.channel()
221+
logger.info("Connection established and channel opened")
222+
223+
def close(self):
224+
try:
225+
if self._channel and self._channel.is_open:
226+
self._channel.close()
227+
except Exception as e:
228+
logger.warning(f"Error closing channel: {e}")
229+
try:
230+
if self._connection and self._connection.is_open:
231+
self._connection.close()
232+
except Exception as e:
233+
logger.warning(f"Error closing connection: {e}")
234+
self._executor.shutdown(wait=False, cancel_futures=True)
235+
236+
def _on_term_signal(self, signum, _frame):
237+
self._in_shutdown = True
238+
logger.warning(f"Received signal {signum}; will exit after current message finishes.")
239+
240+
# -------- worker function (no channel ops here) --------
241+
def _process_messages(self, basic_deliver, properties, body):
242+
ack = True
243+
err = None
244+
245+
# Convert if needed
246+
if self.message_converter:
247+
try:
248+
body, content_type = self.message_converter.convert(body)
249+
if properties is None:
250+
properties = pika.BasicProperties(content_type=content_type)
251+
else:
252+
properties.content_type = content_type
253+
logger.info("Message converted")
254+
except Exception as error:
255+
logger.error(f"Message conversion failed: {error}\n{traceback.format_exc()}")
256+
ack = False
257+
err = error
258+
259+
# Handlers
260+
if ack and self.message_handlers:
261+
for message_handler in self.message_handlers:
262+
try:
263+
logger.info(f"Handling message with handler: {message_handler.__class__.__name__}")
264+
body, properties = message_handler.handle(body, properties=properties, channel=None)
265+
except Exception as error:
266+
logger.error(f"Message handling failed: {error}\n{traceback.format_exc()}")
267+
logger.exception("Message handling failed, see traceback in document")
268+
ack = False
269+
err = error
270+
break
271+
272+
return ack, body, properties, err, basic_deliver.delivery_tag
273+
274+
# -------- single-message main --------
275+
def run(self) -> int:
276+
"""
277+
Exit codes:
278+
0 -> processed OK or queue empty
279+
2 -> conversion/handler failed (rejected)
280+
3 -> connection/setup error
281+
"""
282+
try:
283+
self.connect()
284+
except Exception as e:
285+
logger.error(f"Failed to connect to RabbitMQ: {e}")
286+
return 3
287+
288+
try:
289+
method, properties, body = self._channel.basic_get(self._queue, auto_ack=False)
290+
if not method:
291+
logger.warning(f"No message available in queue '{self._queue}', exiting")
292+
return 0
293+
294+
delivery_tag = method.delivery_tag
295+
logger.info(f"Received message #{delivery_tag} from {getattr(properties,'app_id',None)} meta: {getattr(properties,'headers',None)}")
296+
if self.log_body:
297+
logger.debug(f"Message body: {body!r}")
298+
299+
future = self._executor.submit(self._process_messages, method, properties, body)
300+
301+
# keep heartbeats flowing while waiting for worker completion
302+
while not future.done():
303+
try:
304+
self._connection.process_data_events(time_limit=0)
305+
except Exception:
306+
pass
307+
time.sleep(0.25)
308+
309+
ack, out_body, out_props, err, dtag = future.result()
310+
311+
# Check if properties has some status flag set from handler
312+
_success = out_props.headers.get('success', True)
313+
314+
if not ack or not _success:
315+
logger.warning(f"Rejecting message due to handler failure or success flag: {_success}, error: {err}")
316+
try:
317+
self._channel.basic_reject(dtag, requeue=False)
318+
except Exception as e:
319+
logger.error(f"Failed to REJECT message #{dtag}: {e}")
320+
return 3
321+
return 2
322+
323+
if self.forward:
324+
logger.info(f"Publishing message to exchange/queue: {self.forward}")
325+
try:
326+
self._channel.basic_publish(
327+
exchange=self.forward, routing_key="", body=out_body, properties=out_props
328+
)
329+
except Exception as e:
330+
logger.error(f"Publish failed: {e}")
331+
return 3 # leave unacked for redelivery
332+
333+
try:
334+
self._channel.basic_ack(dtag)
335+
logger.info(f"ACKed message #{dtag}")
336+
if self._in_shutdown:
337+
logger.info("Shutdown requested; exiting cleanly after finishing message")
338+
return 0
339+
except Exception as e:
340+
logger.error(f"Failed to ACK message #{dtag}: {e}")
341+
return 3
342+
343+
finally:
344+
self.close()
345+
346+
161347
class RMQConsumer:
162348
"""This is an example consumer that will handle unexpected interactions
163349
with RabbitMQ such as channel and connection closures.

emf/model_merger/worker.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import sys
23
from uuid import uuid4
34
from emf.common.logging import custom_logger
45

@@ -8,7 +9,8 @@
89

910
# Initialize custom logger
1011
logger = logging.getLogger(__name__)
11-
elk_handler = custom_logger.initialize_custom_logger(extra={'worker': 'model-merger', 'worker_uuid': str(uuid4())})
12+
worker_uuid = str(uuid4())
13+
elk_handler = custom_logger.initialize_custom_logger(extra={'worker': 'model-merger', 'worker_uuid': worker_uuid})
1214

1315
import config
1416
from emf.common.integrations import rabbit
@@ -20,13 +22,26 @@
2022

2123
parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.merger)
2224

23-
# RabbitMQ consumer implementation
24-
consumer = rabbit.RMQConsumer(queue=INPUT_RABBIT_QUE,
25-
message_handlers=[HandlerMergeModels()],
26-
forward=OUTPUT_RMQ_EXCHANGE,
27-
)
25+
logger.info(f"Starting 'model-merger' worker with assigned trace uuid: {worker_uuid}")
2826

29-
try:
30-
consumer.run()
31-
except KeyboardInterrupt:
32-
consumer.stop()
27+
# RabbitMQ consumer implementation
28+
if CONSUMER_TYPE == "SINGLE_MESSAGE":
29+
# RabbitMQ single message consumer implementation aligned with KEDA usage
30+
consumer = rabbit.SingleMessageConsumer(
31+
queue=INPUT_RABBIT_QUEUE,
32+
message_handlers=[HandlerMergeModels()],
33+
forward=OUTPUT_RMQ_EXCHANGE,
34+
)
35+
sys.exit(consumer.run())
36+
elif CONSUMER_TYPE == "LONG_LIVING":
37+
# RabbitMQ long-living consumer implementation
38+
consumer = rabbit.RMQConsumer(queue=INPUT_RABBIT_QUEUE,
39+
message_handlers=[HandlerMergeModels()],
40+
forward=OUTPUT_RMQ_EXCHANGE,
41+
)
42+
try:
43+
consumer.run()
44+
except KeyboardInterrupt:
45+
consumer.stop()
46+
else:
47+
raise Exception("Unknown CONSUMER_TYPE, please check the config/cgm_worker/merger.properties file")

emf/model_validator/worker.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import logging
2+
import sys
23
from uuid import uuid4
34
from emf.common.logging import custom_logger
45
from emf.model_validator.model_validator import HandlerModelsValidator
56

67
# Initialize custom logger
78
logger = logging.getLogger(__name__)
8-
elk_handler = custom_logger.initialize_custom_logger(extra={'worker': 'model-validator', 'worker_uuid': str(uuid4())})
9+
worker_uuid = str(uuid4())
10+
elk_handler = custom_logger.initialize_custom_logger(extra={'worker': 'model-validator', 'worker_uuid': worker_uuid})
911

1012
import config
1113
from emf.common.integrations import rabbit
@@ -16,13 +18,26 @@
1618

1719
parse_app_properties(caller_globals=globals(), path=config.paths.model_validator.model_validator)
1820

19-
# RabbitMQ consumer implementation
20-
consumer = rabbit.RMQConsumer(queue=INPUT_RMQ_QUEUE,
21-
message_handlers=[HandlerModelsValidator()],
22-
forward=OUTPUT_RMQ_EXCHANGE,
23-
)
21+
logger.info(f"Starting 'model-validator' worker with assigned trace uuid: {worker_uuid}")
2422

25-
try:
26-
consumer.run()
27-
except KeyboardInterrupt:
28-
consumer.stop()
23+
# RabbitMQ consumer implementation
24+
if CONSUMER_TYPE == "SINGLE_MESSAGE":
25+
# RabbitMQ single message consumer implementation aligned with KEDA usage
26+
consumer = rabbit.SingleMessageConsumer(
27+
queue=INPUT_RMQ_QUEUE,
28+
message_handlers=[HandlerModelsValidator()],
29+
forward=OUTPUT_RMQ_EXCHANGE,
30+
)
31+
sys.exit(consumer.run())
32+
elif CONSUMER_TYPE == "LONG_LIVING":
33+
# RabbitMQ long-living consumer implementation
34+
consumer = rabbit.RMQConsumer(queue=INPUT_RMQ_QUEUE,
35+
message_handlers=[HandlerModelsValidator()],
36+
forward=OUTPUT_RMQ_EXCHANGE,
37+
)
38+
try:
39+
consumer.run()
40+
except KeyboardInterrupt:
41+
consumer.stop()
42+
else:
43+
raise Exception("Unknown CONSUMER_TYPE, please check the config/model_validator/model_validator.properties file")

0 commit comments

Comments
 (0)