Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ It is based on layered architecture and is based on DIRAC architecture:
SandboxMetadataDB class is a front-end to the metadata for sandboxes.

* JobParametersDB
JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters.
It is used in most of the WMS components and is based on Elastic/OpenSearch.
JobParametersDB class is a front-end to the OpenSearch based index providing Job Parameters.
It is used in most of the WMS components and is based on OpenSearch.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.

This agent will take care of:
- removing all jobs that are in status JobStatus.DELETED
Expand All @@ -22,6 +22,7 @@
than 0.

"""

import datetime
import os

Expand All @@ -40,7 +41,7 @@
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB


class JobCleaningAgent(AgentModule):
Expand Down Expand Up @@ -298,8 +299,8 @@ def deleteJobOversizedSandbox(self, jobIDList):
failed = {}
successful = {}

jobIDs = [int(jobID) for jobID in jobIDList]
result = getJobParameters(jobIDs, "OutputSandboxLFN")
jobIDList = [int(jobID) for jobID in jobIDList]
result = JobParametersDB().getJobParameters(jobIDList, ["OutputSandboxLFN"])
if not result["OK"]:
return result
osLFNDict = result["Value"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" Test class for Job Cleaning Agent
"""
"""Test class for Job Cleaning Agent"""

from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -128,13 +128,15 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getJobAttributes", return_value=S_OK(""))
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)
mockJobParamsDB = mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobParametersDB")
mockJobParamsDB.return_value.getJobParameters.return_value = params

jobCleaningAgent = JobCleaningAgent()
jobCleaningAgent.log = gLogger
Expand Down
65 changes: 0 additions & 65 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,56 +110,6 @@ def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=N
"Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp
)

#############################################################################
def getJobParameters(self, jobID, paramList=None):
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If parameterList is empty - all the parameters are returned.
"""
jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID

resultDict = {}
if paramList:
if isinstance(paramList, str):
paramList = paramList.split(",")
paramNameList = []
for pn in paramList:
ret = self._escapeString(pn)
if not ret["OK"]:
return ret
paramNameList.append(ret["Value"])
cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format(
",".join(str(int(j)) for j in jobIDList),
",".join(paramNameList),
)
result = self._query(cmd)
if result["OK"]:
if result["Value"]:
for res_jobID, res_name, res_value in result["Value"]:
try:
res_value = res_value.decode(errors="replace") # account for use of BLOBs
except AttributeError:
pass
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value

return S_OK(resultDict) # there's a slim chance that this is an empty dictionary
else:
return S_ERROR("JobDB.getJobParameters: failed to retrieve parameters")

else:
result = self.getFields("JobParameters", ["JobID", "Name", "Value"], {"JobID": jobID})
if not result["OK"]:
return result

for res_jobID, res_name, res_value in result["Value"]:
try:
res_value = res_value.decode(errors="replace") # account for use of BLOBs
except AttributeError:
pass
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value

return S_OK(resultDict) # there's a slim chance that this is an empty dictionary

#############################################################################
def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
"""Get Attic Job Parameters defined for a job with jobID.
Expand Down Expand Up @@ -274,16 +224,6 @@ def getJobAttribute(self, jobID, attribute):
return result
return S_OK(result["Value"].get(attribute))

#############################################################################
@deprecated("Use JobParametersDB instead")
def getJobParameter(self, jobID, parameter):
"""Get the given parameter of a job specified by its jobID"""

result = self.getJobParameters(jobID, [parameter])
if not result["OK"]:
return result
return S_OK(result.get("Value", {}).get(int(jobID), {}).get(parameter))

#############################################################################
def getJobOptParameter(self, jobID, parameter):
"""Get optimizer parameters for the given job."""
Expand Down Expand Up @@ -1023,7 +963,6 @@ def removeJobFromDB(self, jobIDs):

for table in [
"InputData",
"JobParameters",
"AtticJobParameters",
"HeartBeatLoggingInfo",
"OptimizerParameters",
Expand Down Expand Up @@ -1101,10 +1040,6 @@ def rescheduleJob(self, jobID):
return ret
e_jobID = ret["Value"]

res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}")
if not res["OK"]:
return res

# Delete optimizer parameters
if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]:
return S_ERROR("JobDB.removeJobOptParameter: operation failed.")
Expand Down
10 changes: 0 additions & 10 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@ CREATE TABLE `InputData` (
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `JobParameters`;
CREATE TABLE `JobParameters` (
`JobID` INT(11) UNSIGNED NOT NULL,
`Name` VARCHAR(100) NOT NULL,
`Value` TEXT NOT NULL,
PRIMARY KEY (`JobID`,`Name`),
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `OptimizerParameters`;
CREATE TABLE `OptimizerParameters` (
Expand Down
24 changes: 4 additions & 20 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,19 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
:rtype: dict
"""
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB

elasticJobParametersDB = JobParametersDB()
jobDB = JobDB()

if vo: # a user is connecting, with a proxy
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
if not res["OK"]:
return res
parameters = res["Value"]
else: # a service is connecting, no proxy, e.g. StalledJobAgent
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB

q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
res = jobDB._query(q)
res = JobDB()._query(q)
if not res["OK"]:
return res
if not res["Value"]:
Expand All @@ -184,23 +184,7 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
if not res["OK"]:
return res
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = jobDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)
return S_OK(parameters)


def getAvailableRAM(siteName=None, gridCE=None, queue=None):
Expand Down
Loading