1111import pulsar
1212import time
1313import functools
14- import signal
1514import inspect
16- from typing import Callable , Any , Dict , Optional , Set , List , Union , Awaitable , get_type_hints
17- from pulsar import Client , Consumer , Producer
15+ from typing import Callable , Any , Dict , Set , Union , Awaitable , get_type_hints
16+ from pulsar import Client , Producer
1817from .config import Config
1918from .metrics import Metrics , MetricsServer
2019from .context import FSContext
2423logging .basicConfig (level = logging .INFO )
2524logger = logging .getLogger (__name__ )
2625
26+
27+ def _validate_process_func (func : Callable , module_name : str ):
28+ """
29+ Validate the structure of a process function.
30+
31+ Args:
32+ func (Callable): The function to validate
33+ module_name (str): Name of the module for error messages
34+
35+ Raises:
36+ ValueError: If the function structure is invalid
37+ """
38+ # Get function signature
39+ sig = inspect .signature (func )
40+ params = list (sig .parameters .values ())
41+
42+ # Check number of parameters
43+ if len (params ) != 2 :
44+ raise ValueError (
45+ f"Process function for module '{ module_name } ' must have exactly 2 parameters, "
46+ f"got { len (params )} "
47+ )
48+
49+ # Check parameter types using type hints
50+ type_hints = get_type_hints (func )
51+ if not ("context" in type_hints and "data" in type_hints and "return" in type_hints ):
52+ raise ValueError (
53+ f"Process function for module '{ module_name } ' must have type hints for both parameters named 'context', 'data', and a return type"
54+ )
55+ def unwrap_annotated (annotation ):
56+ origin = typing .get_origin (annotation )
57+ if origin is typing .Annotated :
58+ return unwrap_annotated (typing .get_args (annotation )[0 ])
59+ return annotation
60+ def is_dict_str_any (annotation ):
61+ ann = unwrap_annotated (annotation )
62+ origin = typing .get_origin (ann )
63+ args = typing .get_args (ann )
64+ return (origin in (dict , typing .Dict )) and args == (str , Any )
65+ if not (type_hints ["context" ] == FSContext ):
66+ raise ValueError (
67+ f"Process function for module '{ module_name } ' must have FSContext as first parameter"
68+ )
69+ if not is_dict_str_any (type_hints ["data" ]):
70+ raise ValueError (
71+ f"Process function for module '{ module_name } ' must have Dict[str, Any] or dict[str, Any] as second parameter"
72+ )
73+ # Check return type
74+ return_type = type_hints .get ('return' )
75+ def is_dict_return (annotation ):
76+ ann = unwrap_annotated (annotation )
77+ origin = typing .get_origin (ann )
78+ args = typing .get_args (ann )
79+ return (origin in (dict , typing .Dict )) and args == (str , Any )
80+ def is_awaitable_dict (annotation ):
81+ ann = unwrap_annotated (annotation )
82+ origin = typing .get_origin (ann )
83+ args = typing .get_args (ann )
84+ return origin in (typing .Awaitable ,) and len (args ) == 1 and is_dict_return (args [0 ])
85+ if not (is_dict_return (return_type ) or is_awaitable_dict (return_type )):
86+ raise ValueError (
87+ f"Process function for module '{ module_name } ' must return Dict[str, Any], dict[str, Any], or Awaitable thereof, got { return_type } "
88+ )
89+
90+
2791class FSFunction :
2892 """
2993 FunctionStream Function - A serverless function handler for processing messages.
@@ -42,70 +106,6 @@ class FSFunction:
42106 context (FSContext): Context object for accessing configuration
43107 """
44108
45- def _validate_process_func (self , func : Callable , module_name : str ):
46- """
47- Validate the structure of a process function.
48-
49- Args:
50- func (Callable): The function to validate
51- module_name (str): Name of the module for error messages
52-
53- Raises:
54- ValueError: If the function structure is invalid
55- """
56- # Get function signature
57- sig = inspect .signature (func )
58- params = list (sig .parameters .values ())
59-
60- # Check number of parameters
61- if len (params ) != 2 :
62- raise ValueError (
63- f"Process function for module '{ module_name } ' must have exactly 2 parameters, "
64- f"got { len (params )} "
65- )
66-
67- # Check parameter types using type hints
68- type_hints = get_type_hints (func )
69- logging .warning (f"DEBUG: type_hints for { module_name } : { type_hints } " )
70- if not ("context" in type_hints and "data" in type_hints and "return" in type_hints ):
71- raise ValueError (
72- f"Process function for module '{ module_name } ' must have type hints for both parameters named 'context', 'data', and a return type"
73- )
74- def unwrap_annotated (annotation ):
75- origin = typing .get_origin (annotation )
76- if origin is typing .Annotated :
77- return unwrap_annotated (typing .get_args (annotation )[0 ])
78- return annotation
79- def is_dict_str_any (annotation ):
80- ann = unwrap_annotated (annotation )
81- origin = typing .get_origin (ann )
82- args = typing .get_args (ann )
83- return (origin in (dict , typing .Dict )) and args == (str , Any )
84- if not (type_hints ["context" ] == FSContext ):
85- raise ValueError (
86- f"Process function for module '{ module_name } ' must have FSContext as first parameter"
87- )
88- if not is_dict_str_any (type_hints ["data" ]):
89- raise ValueError (
90- f"Process function for module '{ module_name } ' must have Dict[str, Any] or dict[str, Any] as second parameter"
91- )
92- # Check return type
93- return_type = type_hints .get ('return' )
94- def is_dict_return (annotation ):
95- ann = unwrap_annotated (annotation )
96- origin = typing .get_origin (ann )
97- args = typing .get_args (ann )
98- return (origin in (dict , typing .Dict )) and args == (str , Any )
99- def is_awaitable_dict (annotation ):
100- ann = unwrap_annotated (annotation )
101- origin = typing .get_origin (ann )
102- args = typing .get_args (ann )
103- return origin in (typing .Awaitable ,) and len (args ) == 1 and is_dict_return (args [0 ])
104- if not (is_dict_return (return_type ) or is_awaitable_dict (return_type )):
105- raise ValueError (
106- f"Process function for module '{ module_name } ' must return Dict[str, Any], dict[str, Any], or Awaitable thereof, got { return_type } "
107- )
108-
109109 def __init__ (
110110 self ,
111111 process_funcs : Dict [str , Callable [["FSContext" , Dict [str , Any ]], Union [Dict [str , Any ], Awaitable [Dict [str , Any ]]]]],
@@ -136,7 +136,7 @@ def __init__(
136136 raise ValueError (f"Process function not found for module: { module } " )
137137
138138 # Validate function structure
139- self . _validate_process_func (process_funcs [module ], module )
139+ _validate_process_func (process_funcs [module ], module )
140140
141141 # Create authentication if specified
142142 auth = None
@@ -165,11 +165,11 @@ def __init__(
165165
166166 def _setup_consumer (self ):
167167 """
168- Set up a multi-topics consumer for all sources and request sources .
168+ Set up a multi-topics consumer for all sources and the request source .
169169
170170 This method creates a Pulsar consumer that subscribes to multiple topics
171171 specified in the configuration. It collects topics from both regular sources
172- and request sources .
172+ and the request source .
173173
174174 Raises:
175175 ValueError: If no subscription name is set or if no valid sources are found.
@@ -321,7 +321,6 @@ async def process_request(self, message):
321321 # If no response_topic is provided, use the sink topic as default
322322 if not response_topic and self .config .sink and self .config .sink .pulsar and self .config .sink .pulsar .topic :
323323 response_topic = self .config .sink .pulsar .topic
324- logger .info (f"Using sink topic as default response topic: { response_topic } " )
325324
326325 if not response_topic :
327326 logger .error ("No response_topic provided and no sink topic available" )
@@ -356,11 +355,12 @@ async def process_request(self, message):
356355 except Exception as e :
357356 logger .error (f"Error processing request: { str (e )} " )
358357 if not self ._shutdown_event .is_set ():
359- await self ._send_response (
360- response_topic ,
361- request_id ,
362- {'error' : str (e )}
363- )
358+ if request_id : # Only send the response back if the request_id exists
359+ await self ._send_response (
360+ response_topic ,
361+ request_id ,
362+ {'error' : str (e )}
363+ )
364364 self .metrics .record_request_end (False , time .time () - start_time )
365365 self .metrics .record_event (False )
366366 finally :
@@ -481,7 +481,7 @@ def __del__(self):
481481 self ._get_producer .cache_clear ()
482482 except :
483483 pass
484- if hasattr ( self , ' client' ) :
484+ if self . client is not None :
485485 try :
486486 self .client .close ()
487487 except :
0 commit comments