Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
882 changes: 861 additions & 21 deletions pixi.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"diracx-client>=0.0.8",
"diracx-cli>=0.0.8",
"lbprodrun",
"LHCbDIRAC @ git+https://git@gitlab.cern.ch/jlisalab/LHCbDIRAC.git@modules-to-cwl-migration", # Temporary fork dependency
"pydantic",
"pyyaml",
"typer",
Expand Down Expand Up @@ -78,7 +79,7 @@ allow_redefinition = true
enable_error_code = ["import", "attr-defined"]

[[tool.mypy.overrides]]
module = ["requests", "yaml"]
module = ["requests", "yaml", "DIRAC.*", "LHCbDIRAC.*", "DIRACCommon.*"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
4 changes: 3 additions & 1 deletion src/dirac_cwl/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Command classes for workflow pre/post-processing operations."""

from .bookkeeping_report import BookeepingReport
from .core import PostProcessCommand, PreProcessCommand
from .upload_log_file import UploadLogFile

__all__ = ["PreProcessCommand", "PostProcessCommand"]
__all__ = ["PreProcessCommand", "PostProcessCommand", "UploadLogFile", "BookeepingReport"]
159 changes: 159 additions & 0 deletions src/dirac_cwl/commands/bookkeeping_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""LHCb command for bookkeeping report file generation based on the XMLSummary and the XML catalog."""

import os

from DIRAC.Workflow.Utilities.Utils import getStepCPUTimes
from LHCbDIRAC.BookkeepingSystem.Client.BookkeepingClient import BookkeepingClient
from LHCbDIRAC.Core.Utilities.ProductionData import constructProductionLFNs
from LHCbDIRAC.Core.Utilities.XMLSummaries import XMLSummary
from LHCbDIRAC.Workflow.Modules.BookkeepingReport import (
_generate_xml_object,
_generateInputFiles,
_generateOutputFiles,
_prepare_job_info,
_process_time,
)
from LHCbDIRAC.Workflow.Modules.ModulesUtilities import getNumberOfProcessorsToUse

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .core import PostProcessCommand
from .utils import prepare_lhcb_workflow_commons


class BookeepingReport(PostProcessCommand):
"""Generates a bookkeeping report file based on the XMLSummary and the pool XML catalog."""

def execute(self, job_path, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param kwargs: Additional keyword arguments.
"""
# Obtain Workflow Commons
workflow_commons_path = kwargs.get("workflow-commons-path", os.path.join(job_path, "workflow_commons.json"))

workflow_commons = prepare_lhcb_workflow_commons(
workflow_commons_path,
extra_mandatory_values=[
"bk_step_id",
],
extra_default_values={
"bookkeeping_LFNs": [],
"size": {},
"md5": {},
"guid": {},
"sim_description": "NoSimConditions",
},
)

if not workflow_commons["step_status"]["OK"]:
return

# Setup variables
start_time = workflow_commons.get("start_time", None)

cpu_times = {}
if start_time:
cpu_times["StartTime"] = start_time
if "start_stats" in workflow_commons:
cpu_times["StartStats"] = workflow_commons["start_stats"]

exectime, cputime = getStepCPUTimes(cpu_times)

number_of_processors = getNumberOfProcessorsToUse(
workflow_commons["job_id"], workflow_commons["max_number_of_processors"]
)

bk_client = BookkeepingClient()

parameters = {
"PRODUCTION_ID": workflow_commons["production_id"],
"JOB_ID": workflow_commons["prod_job_id"],
"configVersion": workflow_commons["config_version"],
"outputList": workflow_commons["outputs"],
"configName": workflow_commons["config_name"],
"outputDataFileMask": workflow_commons["output_data_file_mask"],
}

if "bookkeeping_LFNs" in workflow_commons and "production_output_data" in workflow_commons:
bk_lfns = workflow_commons["bookkeeping_LFNs"]

if not isinstance(bk_lfns, list):
bk_lfns = [i.strip() for i in bk_lfns.split(";")]

else:
result = constructProductionLFNs(parameters, bk_client)
if not result["OK"]:
raise WorkflowProcessingException("Could not create production LFNs")

bk_lfns = result["Value"]["BookkeepingLFNs"]

ldate, ltime, ldatestart, ltimestart = _process_time(start_time)

# Obtain XMLSummary
if "xml_summary_path" in workflow_commons:
xf_o = XMLSummary(workflow_commons["xml_summary_path"])
else:
xf_o = _generate_xml_object(
workflow_commons["cleaned_application_name"],
workflow_commons["production_id"],
workflow_commons["prod_job_id"],
workflow_commons["command_number"],
workflow_commons["command_id"],
)

info_dict = {
"exectime": exectime,
"cputime": cputime,
"numberOfProcessors": number_of_processors,
"production_id": workflow_commons["production_id"],
"jobID": workflow_commons["job_id"],
"siteName": workflow_commons["site_name"],
"jobType": workflow_commons["job_type"],
"applicationName": workflow_commons["application_name"],
"applicationVersion": workflow_commons["application_version"],
"numberOfEvents": workflow_commons["number_of_events"],
}

# Generate job_info object
job_info = _prepare_job_info(
info_dict,
ldatestart,
ltimestart,
ldate,
ltime,
xf_o,
workflow_commons["inputs"],
workflow_commons["command_id"],
workflow_commons["bk_step_id"],
bk_client,
workflow_commons["config_name"],
workflow_commons["config_version"],
)

# Add input files to job_info
_generateInputFiles(job_info, bk_lfns, workflow_commons["inputs"])

# Add output files to job_info
_generateOutputFiles(
job_info,
bk_lfns,
workflow_commons["event_type"],
workflow_commons["application_name"],
xf_o,
workflow_commons["outputs"],
workflow_commons["inputs"],
)

# Generate SimulationConditions
if workflow_commons["application_name"] == "Gauss":
job_info.simulation_condition = workflow_commons["sim_description"]

# Convert job_info object to XML
doc = job_info.to_xml()

# Write to file
bfilename = f"bookkeeping_{workflow_commons['command_id']}.xml"
with open(bfilename, "wb") as bfile:
bfile.write(doc)
162 changes: 162 additions & 0 deletions src/dirac_cwl/commands/upload_log_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Post-processing command for uploading logging information to a Storage Element."""

import glob
import os
import random
import stat
import time
import zipfile
from urllib.parse import urljoin

from DIRAC import S_ERROR, S_OK, siteName
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.Adler import fileAdler
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.DataManagementSystem.Client.FailoverTransfer import FailoverTransfer
from DIRAC.DataManagementSystem.Utilities.ResolveSE import getDestinationSEList
from DIRAC.Resources.Catalog.PoolXMLFile import getGUID
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport

from dirac_cwl.commands import PostProcessCommand


class UploadLogFile(PostProcessCommand):
"""Post-processing command for log file uploading."""

def execute(self, job_path, **kwargs):
"""Execute the log uploading process.

:param job_path: Path to the job working directory.
:param kwargs: Additional keyword arguments.
"""
# Obtain workflow information
job_id = kwargs.get("job_id", None)
production_id = kwargs.get("production_id", None)
namespace = kwargs.get("namespace", None)
config_version = kwargs.get("config_version", None)

if not job_path or not production_id or not namespace or not config_version:
return S_ERROR("Not enough information to perform the log upload")

ops = Operations()
log_extensions = ops.getValue("LogFiles/Extensions", [])
log_se = ops.getValue("LogStorage/LogSE", "LogSE")

job_report = JobReport(job_id)

output_files = self.obtain_output_files(job_path, log_extensions)

if not output_files:
return S_OK("No files to upload")

# Zip files
zip_name = job_id.zfill(8) + ".zip"
zip_path = os.path.join(job_path, zip_name)

try:
self.zip_files(zip_path, output_files)
except (AttributeError, OSError, ValueError) as e:
job_report.setApplicationStatus("Failed to create zip of log files")
return S_OK(f"Failed to zip files: {repr(e)}")

# Obtain the log destination
zip_lfn = self.get_zip_lfn(production_id, job_id, namespace, config_version)

# Upload to the SE
result = returnSingleResult(StorageElement(log_se).putFile({zip_lfn: zip_path}))

if not result["OK"]: # Failed to uplaod to the LogSE
result = self.generate_failover_transfer(zip_path, zip_name, zip_lfn)

if not result["OK"]:
job_report.setApplicationStatus("Failed To Upload Logs")
return S_ERROR("Failed to upload to FailoverSE")

# Set the Log URL parameter
result = returnSingleResult(StorageElement(log_se).getURL(zip_path, protocol="https"))
if not result["OK"]:
# The rule for interpreting what is to be deflated can be found in /eos/lhcb/grid/prod/lhcb/logSE/.htaccess
logHttpsURL = urljoin("https://lhcb-dirac-logse.web.cern.ch/lhcb-dirac-logse/", zip_lfn)
else:
logHttpsURL = result["Value"]

logHttpsURL = logHttpsURL.replace(".zip", "/")
job_report.setJobParameter("Log URL", f'<a href="{logHttpsURL}">Log file directory</a>')

return S_OK("Log Files uploaded")

def zip_files(self, outputFile, files=None, directory=None):
"""Zip list of files."""
with zipfile.ZipFile(outputFile, "w") as zipped:
for fileIn in files:
# ZIP does not support timestamps before 1980, so for those we simply "touch"
st = os.stat(fileIn)
mtime = time.localtime(st.st_mtime)
dateTime = mtime[0:6]
if dateTime[0] < 1980:
os.utime(fileIn, None) # same as "touch"

zipped.write(fileIn)

def obtain_output_files(self, job_path, extensions=[]):
"""Obtain the files to be added to the log zip from the outputs."""
log_file_extensions = extensions

if not log_file_extensions:
log_file_extensions = [
"*.txt",
"*.log",
"*.out",
"*.output",
"*.xml",
"*.sh",
"*.info",
"*.err",
"prodConf*.py",
"prodConf*.json",
]

files = []

for extension in log_file_extensions:
glob_list = glob.glob(extension, root_dir=job_path, recursive=True)
for check in glob_list:
path = os.path.join(job_path, check)
if os.path.isfile(path):
os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH)
files.append(path)

return files

def get_zip_lfn(self, production_id, job_id, namespace, config_version):
"""Form a logical file name from certain information from the workflow."""
production_id = str(production_id).zfill(8)
job_id = str(job_id).zfill(8)
jobindex = str(int(int(job_id) / 10000)).zfill(4)

log_path = os.path.join("/lhcb", namespace, config_version, "LOG", production_id, jobindex, "")
path = os.path.join(log_path, f"{job_id}.zip")
return path

def generate_failover_transfer(self, zip_path, zip_name, zip_lfn):
"""Prepare a failover transfer ."""
failoverSEs = getDestinationSEList("Tier1-Failover", siteName())
random.shuffle(failoverSEs)

fileMetaDict = {
"Size": os.path.getsize(zip_path),
"LFN": zip_lfn,
"GUID": getGUID(zip_path),
"Checksum": fileAdler(zip_path),
"ChecksumType": "ADLER32",
}

return FailoverTransfer().transferAndRegisterFile(
fileName=zip_name,
localPath=zip_path,
lfn=zip_lfn,
destinationSEList=failoverSEs,
fileMetaDict=fileMetaDict,
masterCatalogOnly=True,
)
Loading
Loading