Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
1c626b6
Fist idea for the bundleCE implementation
AcquaDiGiorgio Mar 26, 2025
5763a9a
Fist implementation BundlerService and BundleBD
AcquaDiGiorgio Apr 11, 2025
7ff94e3
fixed sql syntax error
AcquaDiGiorgio Apr 25, 2025
5703f81
add BundleDB integration tests
AcquaDiGiorgio Apr 25, 2025
bddbde8
fix errors obtained during integration tests
AcquaDiGiorgio Apr 25, 2025
3dbd113
Changed TEXT datatype to VARCHAR
AcquaDiGiorgio May 8, 2025
b303723
Added better templating and logging
AcquaDiGiorgio May 8, 2025
6568659
Input files returned as list
AcquaDiGiorgio May 8, 2025
74f9159
BundlerService inserted in the ConfigTemplate
AcquaDiGiorgio May 8, 2025
34b380d
Adapted BundleCE to the Service (untested)
AcquaDiGiorgio May 8, 2025
b928d65
pre-commit
AcquaDiGiorgio May 8, 2025
29f5585
BundleDB status changes
AcquaDiGiorgio Jun 3, 2025
72bb46e
BundlerTemplates refactor
AcquaDiGiorgio Jun 3, 2025
4240f6d
Bundle - Status and task related changes
AcquaDiGiorgio Jun 3, 2025
8f71edd
General changes
AcquaDiGiorgio Jun 6, 2025
1605105
First working implementation
AcquaDiGiorgio Jun 25, 2025
eb12ec2
First complete version
AcquaDiGiorgio Jul 9, 2025
03b5373
Added a proper individual job status notification and output retrival
AcquaDiGiorgio Jul 21, 2025
e227380
Setup bundled CE proxy
AcquaDiGiorgio Sep 17, 2025
63c2ec2
Add new table for long input treatment
AcquaDiGiorgio Sep 17, 2025
9453553
Change input insertion to DB and Bundled Job status retrieval
AcquaDiGiorgio Sep 17, 2025
699b66f
Remove unnecessary background process
AcquaDiGiorgio Sep 17, 2025
0fa0012
Preprocess job wrapper offline at node
AcquaDiGiorgio Sep 17, 2025
27d7098
Improved output retrieval
AcquaDiGiorgio Oct 1, 2025
6d9dc67
Added extra runner file for better control and monitroing
AcquaDiGiorgio Oct 1, 2025
47f950e
Added a timestamp to avoid bundle stalling when there are not enough …
AcquaDiGiorgio Oct 1, 2025
f1be218
Remove unnecesary status files
AcquaDiGiorgio Oct 8, 2025
c2c6a7a
Fix job insertion in running or finished bundle
AcquaDiGiorgio Oct 8, 2025
502f3d7
Remove testing code
AcquaDiGiorgio Oct 8, 2025
a96a870
UNTESTED: Added agent to monitor bundles
AcquaDiGiorgio Oct 8, 2025
2a590cb
Modified ce.submitJob to be the same as the submission through the Jo…
AcquaDiGiorgio Oct 17, 2025
2a896fa
Extended BundleManagerAgent ConfigTemplate
AcquaDiGiorgio Oct 17, 2025
3861308
Changed the template to its original format
AcquaDiGiorgio Oct 17, 2025
c22ff2c
Added flags to control Bundle stages and accept the JobID obtained th…
AcquaDiGiorgio Oct 17, 2025
6706bde
Updated agent to be able to force-submit stalled bundles
AcquaDiGiorgio Oct 17, 2025
874c81a
Pre-commit
AcquaDiGiorgio Oct 17, 2025
0f3025e
Generalize Bundle Status using PilotStatus
AcquaDiGiorgio Oct 23, 2025
4317fba
Send heartbeat to maintain bundles alive against StalledJobAgent
AcquaDiGiorgio Oct 23, 2025
dd11768
fix(PushJobAgent): Bug while obtaining job output in failed job wrappers
AcquaDiGiorgio Mar 2, 2026
232fb89
chore: Clean and document
AcquaDiGiorgio Mar 2, 2026
5b04cc1
chore: Remove unnecesary _cleanFinishedBundles at BundleManagerAgent
AcquaDiGiorgio Mar 2, 2026
e0e31d6
chore: Remove unnecesary procyPath at BundleDB
AcquaDiGiorgio Mar 2, 2026
97c2b24
pre-commit
AcquaDiGiorgio Mar 2, 2026
48ec52c
chore: Remove ExecTemplate from BundleCE and BundleDB
AcquaDiGiorgio Mar 2, 2026
720baa6
Merge branch 'DIRACGrid:integration' into BundleCE
AcquaDiGiorgio Mar 2, 2026
550a3fb
chore: Remove debugging code
AcquaDiGiorgio Mar 2, 2026
3f1244e
chore(BundleDB): split getJobsOfBundle in 2 functions
AcquaDiGiorgio Mar 3, 2026
44e7048
fix(Bundler): Bug, inputs instead of executables while wrapping
AcquaDiGiorgio Mar 3, 2026
73be1e7
chore(BundleCE): Improve job output obtaining, tmp dir no longer needed
AcquaDiGiorgio Mar 3, 2026
2f54d90
chore: Remove unused imports
AcquaDiGiorgio Mar 3, 2026
108f60f
fix(BundleDB): Add extra safeguard while selecting best bundle
AcquaDiGiorgio Mar 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def _writeXRSL(self, executableFile, inputs, outputs):

def _bundlePreamble(self, executableFile):
"""Bundle the preamble with the executable file"""
wrapperContent = f"{self.preamble}\n./{executableFile}"
wrapperContent = f"{self.preamble}\n./{os.path.basename(executableFile)}"

# We need to make sure the executable file can be executed by the wrapper
# By adding the execution mode to the file, the file will be processed as an "executable" in the XRSL
Expand Down
115 changes: 115 additions & 0 deletions src/DIRAC/Resources/Computing/AREXEnhancedComputingElement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import os
import shutil

from DIRAC import S_ERROR, S_OK
from DIRAC.Resources.Computing.AREXComputingElement import AREXComputingElement


class AREXEnhancedComputingElement(AREXComputingElement):
def _getListOfAvailableOutputs(self, jobID, arcJobID, path=None):
"""Request a list of outputs available for a given jobID.

:param str jobID: job reference without the DIRAC stamp
:param str arcJobID: ARC job ID
:param str path: remote path
:return list: names of the available outputs
"""
query = self._urlJoin(os.path.join("jobs", arcJobID, "session", path or ""))

# Submit the GET request to retrieve the names of the outputs
# self.log.debug(f"Retrieving the names of the outputs for {jobID}")
self.log.debug(f"Retrieving the names of the outputs with {query}")
result = self._request("get", query)
if not result["OK"]:
self.log.error("Failed to retrieve at least some outputs", f"for {jobID}: {result['Message']}")
return S_ERROR(f"Failed to retrieve at least some outputs for {jobID}")
response = result["Value"]

if not response.text:
return S_ERROR(f"There is no output for job {jobID}")

# return S_OK(response.json()["file"])
return S_OK(response.json())

def getJobOutput(self, jobID, workingDirectory=None, path=None):
"""Get the outputs of the given job reference.

Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.

:param str jobID: job reference followed by the DIRAC stamp.
:param str workingDirectory: name of the directory containing the retrieved outputs.
:param str path: remote path
:return: content of stdout and stderr
"""
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot get job outputs", result["Message"])
return result

# Extract stamp from the Job ID
if ":::" in jobID:
jobRef, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
arcJob = self._jobReferenceToArcID(jobRef)

# Get the list of available outputs
result = self._getListOfAvailableOutputs(jobRef, arcJob, path)
if not result["OK"]:
return result
remoteOutputs = result["Value"]
self.log.debug("Outputs to get are", remoteOutputs)

remoteOutputsFiles = []
if "file" in remoteOutputs:
remoteOutputsFiles = remoteOutputs["file"]

remoteOutputsDirs = []
if "dir" in remoteOutputs:
remoteOutputsDirs = remoteOutputs["dir"]

if not workingDirectory:
if "WorkingDirectory" in self.ceParameters:
# We assume that workingDirectory exists
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
else:
workingDirectory = arcJob

if not os.path.exists(workingDirectory):
os.mkdir(workingDirectory)

# Directories
for remoteOutput in remoteOutputsDirs:
self.getJobOutput(
jobID,
workingDirectory=os.path.join(workingDirectory, remoteOutput),
path=os.path.join(path or "", remoteOutput),
)

# Files
stdout = None
stderr = None
for remoteOutput in remoteOutputsFiles:
# Prepare the command
# query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))
query = self._urlJoin(os.path.join("jobs", arcJob, "session", path or "", remoteOutput))

# Submit the GET request to retrieve outputs
result = self._request("get", query, stream=True)
if not result["OK"]:
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
response = result["Value"]

localOutput = os.path.join(workingDirectory, remoteOutput)
with open(localOutput, "wb") as f:
shutil.copyfileobj(response.raw, f)

if remoteOutput == f"{stamp}.out":
with open(localOutput) as f:
stdout = f.read()
if remoteOutput == f"{stamp}.err":
with open(localOutput) as f:
stderr = f.read()

return S_OK((stdout, stderr))
Loading
Loading