Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions src/DIRAC/Core/DISET/ServiceReactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.ConfigurationSystem.Client import PathFinder

#: Time during which the service does not accept new requests and handles those in the queue, if the backlog is too large
#: This sleep is repeated for as long as Service.wantsThrottle is truthy
THROTTLE_SERVICE_SLEEP_SECONDS = 0.25


class ServiceReactor:
__transportExtraKeywords = {
Expand Down Expand Up @@ -200,7 +196,6 @@ def __acceptIncomingConnection(self, svcName=False):
services at the same time
"""
sel = self.__getListeningSelector(svcName)
throttleExpires = None
while self.__alive:
clientTransport = None
try:
Expand All @@ -224,16 +219,27 @@ def __acceptIncomingConnection(self, svcName=False):
gLogger.warn(f"Client connected from banned ip {clientIP}")
clientTransport.close()
continue
# Handle throttling
if self.__services[svcName].wantsThrottle and throttleExpires is None:
throttleExpires = time.time() + THROTTLE_SERVICE_SLEEP_SECONDS
if throttleExpires:
if time.time() > throttleExpires:
throttleExpires = None
else:
gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress()))
# Handle throttling: reject all connections while overloaded
# to prevent queue growth when threads are stuck.
# wantsThrottle also handles state tracking and diagnostic logging.
svc = self.__services[svcName]
if svc.wantsThrottle:
# Check if throttle has exceeded the maximum allowed duration
maxThrottleDuration = svc.getConfig().getMaxThrottleDuration()
if maxThrottleDuration > 0 and svc.throttleDuration > maxThrottleDuration:
diag = svc.throttleDiagnostics()
gLogger.fatal(
f"Service {svcName} stuck in throttle, initiating process restart",
f"duration={svc.throttleDuration:.0f}s (limit: {maxThrottleDuration}s), "
f"queue={diag['queue']}/{diag['maxQueue']}, "
f"threads={diag['threads']}/{diag['maxThreads']}",
)
clientTransport.close()
continue
self.__alive = False
return
gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress()))
clientTransport.close()
continue
# Handle connection
self.__stats.connectionStablished()
self.__services[svcName].handleConnection(clientTransport)
Expand Down
57 changes: 55 additions & 2 deletions src/DIRAC/Core/DISET/private/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
from DIRAC.FrameworkSystem.Client.SecurityLogClient import SecurityLogClient


#: Interval between periodic throttle warning messages (seconds)
THROTTLE_LOG_INTERVAL_SECONDS = 30


class Service:
SVC_VALID_ACTIONS = {"RPC": "export", "FileTransfer": "transfer", "Message": "msg", "Connection": "Message"}
SVC_SECLOG_CLIENT = SecurityLogClient()
Expand Down Expand Up @@ -62,6 +66,8 @@ def __init__(self, serviceData):
self._transportPool = getGlobalTransportPool()
self.__cloneId = 0
self.__maxFD = 0
self._throttleStartedAt = None
self._lastThrottleLog = 0
self.activityMonitoring = False
# Check if monitoring is enabled
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="ServiceMonitoring"):
Expand Down Expand Up @@ -282,9 +288,56 @@ def err_handler(result):

@property
def wantsThrottle(self):
"""Boolean property for if the service wants requests to stop being accepted"""
"""Boolean property for if the service wants requests to stop being accepted.

Also maintains throttle duration tracking: records when throttling
started and logs state transitions and periodic diagnostics.
Returns True if the service should reject incoming connections.
"""
nQueued = self._threadPool._work_queue.qsize()
return nQueued > self._cfg.getMaxWaitingPetitions()
shouldThrottle = nQueued > self._cfg.getMaxWaitingPetitions()

now = time.time()
if shouldThrottle:
if self._throttleStartedAt is None:
self._throttleStartedAt = now
self._lastThrottleLog = now
diag = self.throttleDiagnostics()
gLogger.warn(
f"Service {self._name} entering throttle mode",
f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}",
)
elif now - self._lastThrottleLog >= THROTTLE_LOG_INTERVAL_SECONDS:
duration = now - self._throttleStartedAt
diag = self.throttleDiagnostics()
gLogger.warn(
f"Service {self._name} still throttling",
f"duration={duration:.0f}s, queue={diag['queue']}/{diag['maxQueue']}, "
f"threads={diag['threads']}/{diag['maxThreads']}",
)
self._lastThrottleLog = now
elif self._throttleStartedAt is not None:
duration = now - self._throttleStartedAt
gLogger.info(f"Service {self._name} throttle cleared", f"duration={duration:.1f}s")
self._throttleStartedAt = None

return shouldThrottle

@property
def throttleDuration(self):
"""Seconds the service has been continuously throttling, or 0 if not throttling."""
if self._throttleStartedAt is None:
return 0
return time.time() - self._throttleStartedAt

def throttleDiagnostics(self):
"""Return a dict of diagnostics useful for throttle logging"""
return {
"queue": self._threadPool._work_queue.qsize(),
"maxQueue": self._cfg.getMaxWaitingPetitions(),
"threads": len(self._threadPool._threads),
"maxThreads": self._cfg.getMaxThreads(),
}

# Threaded process function
def _processInThread(self, clientTransport):
Expand Down
12 changes: 12 additions & 0 deletions src/DIRAC/Core/DISET/private/ServiceConfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ def getURL(self):
self.setURL(serviceURL)
return serviceURL

def getMaxThrottleDuration(self):
"""Maximum seconds a service can remain in throttle mode before triggering a restart.

Set to 0 to disable auto-restart (default).
When the throttle duration exceeds this value, the service process exits
to allow the process supervisor (e.g. runsv) to restart it cleanly.
"""
try:
return int(self.getOption("MaxThrottleDuration"))
except Exception:
return 0

def getContextLifeTime(self):
optionValue = self.getOption("ContextLifeTime")
try:
Expand Down
Loading