@@ -69,9 +69,23 @@ def load_module_from_path(module_name, path_to_module):
6969 basestring = str
7070
7171try :
72- from Pilot .proxyTools import getVO , BaseRequest , TokenBasedRequest , extract_diracx_payload
72+ from Pilot .proxyTools import (
73+ getVO ,
74+ BaseRequest ,
75+ TokenBasedRequest ,
76+ extract_diracx_payload ,
77+ refreshPilotToken ,
78+ refreshUserToken
79+ )
7380except ImportError :
74- from proxyTools import getVO , BaseRequest , TokenBasedRequest , extract_diracx_payload
81+ from proxyTools import (
82+ getVO ,
83+ BaseRequest ,
84+ TokenBasedRequest ,
85+ extract_diracx_payload ,
86+ refreshPilotToken ,
87+ refreshUserToken
88+ )
7589
7690try :
7791 FileNotFoundError # pylint: disable=used-before-assignment
@@ -525,9 +539,10 @@ def __init__(
525539 isPilotLoggerOn = True ,
526540 pilotUUID = "unknown" ,
527541 flushInterval = 10 ,
528- bufsize = 1000 ,
542+ bufsize = 250 ,
529543 jwt = {},
530- legacy_logging = False
544+ legacy_logging = False ,
545+ clientID = ""
531546 ):
532547 """
533548 c'tor
@@ -538,7 +553,7 @@ def __init__(
538553 self .url = url
539554 self .pilotUUID = pilotUUID
540555 self .isPilotLoggerOn = isPilotLoggerOn
541- sendToURL = partial (sendMessage , url , pilotUUID , legacy_logging )
556+ sendToURL = partial (sendMessage , url , pilotUUID , legacy_logging , clientID )
542557 self .buffer = FixedSizeBuffer (sendToURL , bufsize = bufsize , autoflush = flushInterval , jwt = jwt )
543558
544559 def format_to_json (self , level , message ):
@@ -622,7 +637,7 @@ class FixedSizeBuffer(object):
622637 Once it's full, a message is sent to a remote server and the buffer is renewed.
623638 """
624639
625- def __init__ (self , senderFunc , bufsize = 1000 , autoflush = 10 , jwt = {}):
640+ def __init__ (self , senderFunc , bufsize = 250 , autoflush = 10 , jwt = {}):
626641 """
627642 Constructor.
628643
@@ -645,6 +660,10 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10, jwt={}):
645660 self ._nlines = 0
646661 self .senderFunc = senderFunc
647662 self .jwt = jwt
663+ # A fixed buffer used by a remote buffer can be deactivated:
664+ # If there's a 403/401 error, instead of crashing the pilot,
665+ # we will deactivate the log sending, and prefer just running the pilot.
666+ self .activated = True
648667
649668 @synchronized
650669 def write (self , content_json ):
@@ -657,13 +676,11 @@ def write(self, content_json):
657676 :return: None
658677 :rtype: None
659678 """
679+ if not self .activated :
680+ pass
660681
661682 self .output .extend (content_json )
662-
663- try :
664- self ._nlines += max (1 , len (content_json ))
665- except Exception :
666- raise ValueError (content_json )
683+ self ._nlines += max (1 , len (content_json ))
667684 self .sendFullBuffer ()
668685
669686 @synchronized
@@ -674,7 +691,11 @@ def sendFullBuffer(self):
674691 """
675692
676693 if self ._nlines >= self .bufsize :
677- self .flush ()
694+ try :
695+ self .flush ()
696+ except Exception as e :
697+ print ("Deactivating fixed size buffer due to" , str (e ))
698+ self .activated = False
678699 self .output = []
679700
680701 @synchronized
@@ -685,6 +706,9 @@ def flush(self, force=False):
685706 :return: None
686707 :rtype: None
687708 """
709+ if not self .activated :
710+ pass
711+
688712 if force or (self .output and self ._nlines > 0 ):
689713 self .senderFunc (self .jwt , self .output )
690714 self ._nlines = 0
@@ -700,7 +724,7 @@ def cancelTimer(self):
700724 self ._timer .cancel ()
701725
702726
703- def sendMessage (diracx_URL , pilotUUID , legacy = False , jwt = {}, rawMessage = []):
727+ def sendMessage (diracx_URL , pilotUUID , legacy = False , clientID = "" , jwt = {}, rawMessage = []):
704728 """
705729 Invoke a remote method on a Tornado server and pass a JSON message to it.
706730
@@ -724,8 +748,21 @@ def sendMessage(diracx_URL, pilotUUID, legacy=False, jwt={}, rawMessage = []):
724748
725749 if legacy :
726750 endpoint_path = "api/pilots/legacy/message"
751+ refresh_callback = partial (
752+ refreshUserToken ,
753+ diracx_URL ,
754+ pilotUUID ,
755+ jwt ,
756+ clientID
757+ )
727758 else :
728759 endpoint_path = "api/pilots/internal/message"
760+ refresh_callback = partial (
761+ refreshPilotToken ,
762+ diracx_URL ,
763+ pilotUUID ,
764+ jwt
765+ )
729766
730767 config = TokenBasedRequest (
731768 diracx_URL = diracx_URL ,
@@ -738,7 +775,8 @@ def sendMessage(diracx_URL, pilotUUID, legacy=False, jwt={}, rawMessage = []):
738775 # Do the request
739776 _res = config .executeRequest (
740777 raw_data = raw_data ,
741- json_output = False
778+ json_output = False ,
779+ refresh_callback = refresh_callback
742780 )
743781
744782
@@ -775,7 +813,8 @@ def __init__(self, pilotParams):
775813 flushInterval = interval ,
776814 bufsize = bufsize ,
777815 jwt = pilotParams .jwt ,
778- legacy_logging = pilotParams .isLegacyLogging
816+ legacy_logging = pilotParams .isLegacyLogging ,
817+ clientID = pilotParams .clientID
779818 )
780819
781820 self .log .isPilotLoggerOn = isPilotLoggerOn
@@ -976,7 +1015,7 @@ def __init__(self):
9761015 self .loggerURL = None
9771016 self .isLegacyLogging = False
9781017 self .loggerTimerInterval = 0
979- self .loggerBufsize = 1000
1018+ self .loggerBufsize = 250
9801019 self .pilotUUID = "unknown"
9811020 self .modules = ""
9821021 self .userEnvVariables = ""
0 commit comments