Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ preferably on Defaults subsection. ::

/Operations/Defaults/ResourceStatus
/Config
State = Active
Cache = 720
FromAddress = email@address
/StatusTypes
default = all
StorageElement = ReadAccess,WriteAccess,CheckAccess,RemoveAccess

.. _config section :

Expand All @@ -24,7 +20,5 @@ Config section

This section is all you need to get the RSS working. The parameters are the following:

:State: < Active || InActive ( default if not specified ) > is the flag used on the ResourceStatus helper to switch between CS and RSS. If Active, RSS is used.
:Cache: < <int> || 300 ( default if not specified ) > [ seconds ] sets the lifetime for the cached information on RSSCache.
:FromAddress: < <string> || ( default dirac mail address ) > email used t osend the emails from ( sometimes a valid email address is needed ).
:StatusTypes: if a ElementType has more than one StatusType ( aka StorageElement ), we have to specify them here, Otherwise, "all" is taken as StatusType.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ Please, make sure you have the following schema::

/Operations/Defaults/ResourceStatus
/Config
State = InActive
Cache = 300
/StatusTypes
default = all
StorageElement = ReadAccess,WriteAccess,CheckAccess,RemoveAccess

For a more detailed explanation, take a look to the official documentation:
:ref:`rss-configuration`.
Expand Down
5 changes: 0 additions & 5 deletions docs/source/DeveloperGuide/Systems/ResourceStatus/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ Cache tables for metrics used by policies.

* - Table
- Purpose
* - AccountingCache
- Accounting plots and metrics
* - DowntimeCache
- Scheduled downtimes from GOCDB (DowntimeID, StartDate, EndDate, Severity)
* - GGUSTicketsCache
Expand Down Expand Up @@ -546,9 +544,6 @@ Policies
Actions
``/Operations/Defaults/ResourceStatus/PolicyActions``

Status Types
``/Operations/Defaults/ResourceStatus/Config/StatusTypes``

Services
``/Systems/ResourceStatus/Services/``

Expand Down
18 changes: 2 additions & 16 deletions src/DIRAC/ResourceStatusSystem/Agent/CacheFeederAgent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" CacheFeederAgent
"""CacheFeederAgent

This agent feeds the Cache tables with the outputs of the cache commands.

Expand All @@ -8,6 +8,7 @@
:dedent: 2
:caption: CacheFeederAgent options
"""

from DIRAC import S_OK
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.LCG.GOCDBClient import GOCDBClient
Expand Down Expand Up @@ -61,21 +62,6 @@ def initialize(self):
{"Pilot": {"element": "Resource", "siteName": None}},
]

# FIXME: do not forget about hourly vs Always ...etc
# AccountingCacheCommand
# self.commands[ 'AccountingCache' ] = [
# {'SuccessfullJobsBySiteSplitted' :{'hours' :24, 'plotType' :'Job' }},
# {'FailedJobsBySiteSplitted' :{'hours' :24, 'plotType' :'Job' }},
# {'SuccessfullPilotsBySiteSplitted' :{'hours' :24, 'plotType' :'Pilot' }},
# {'FailedPilotsBySiteSplitted' :{'hours' :24, 'plotType' :'Pilot' }},
# {'SuccessfullPilotsByCESplitted' :{'hours' :24, 'plotType' :'Pilot' }},
# {'FailedPilotsByCESplitted' :{'hours' :24, 'plotType' :'Pilot' }},
# {'RunningJobsBySiteSplitted' :{'hours' :24, 'plotType' :'Job' }},
# # {'RunningJobsBySiteSplitted' :{'hours' :168, 'plotType' :'Job' }},
# # {'RunningJobsBySiteSplitted' :{'hours' :720, 'plotType' :'Job' }},
# # {'RunningJobsBySiteSplitted' :{'hours' :8760, 'plotType' :'Job' }},
# ]

# VOBOXAvailability
# self.commands[ 'VOBOXAvailability' ] = [
# { 'VOBOXAvailability' : {} }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,77 +42,6 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.setServer("ResourceStatus/ResourceManagement")

# AccountingCache Methods ....................................................

def selectAccountingCache(
self, name=None, plotType=None, plotName=None, result=None, dateEffective=None, lastCheckTime=None, meta=None
):
"""
Gets from PolicyResult all rows that match the parameters given.

:param name: name of an individual of the grid topology
:type name: string, list
:param plotType: the plotType name (e.g. 'Pilot')
:type plotType: string, list
:param plotName: the plot name
:type plotName: string, list
:param result: command result
:type result: string, list
:param dateEffective: time-stamp from which the result is effective
:type dateEffective: datetime, list
:param lastCheckTime: time-stamp setting last time the result was checked
:type lastCheckTime: datetime, list
:param dict meta: metadata for the mysql query. Currently it is being used only for column selection.
For example: meta={'columns': ['Name']} will return only the 'Name' column.
:return: S_OK() || S_ERROR()
"""
columnNames = ["Name", "PlotType", "PlotName", "Result", "DateEffective", "LastCheckTime", "Meta"]
columnValues = [name, plotType, plotName, result, dateEffective, lastCheckTime, meta]

return self._getRPC().select("AccountingCache", prepareDict(columnNames, columnValues))

def addOrModifyAccountingCache(
self, name=None, plotType=None, plotName=None, result=None, dateEffective=None, lastCheckTime=None
):
"""
Adds or updates-if-duplicated to AccountingCache. Using `name`, `plotType`
and `plotName` to query the database, decides whether to insert or update the
table.

:param str name: name of an individual of the grid topology
:param str plotType: name (e.g. 'Pilot')
:param str plotName: the plot name
:param str result: command result
:param datetime dateEffective: timestamp from which the result is effective
:param datetime lastCheckTime: timestamp setting last time the result was checked
:return: S_OK() || S_ERROR()
"""
columnNames = ["Name", "PlotType", "PlotName", "Result", "DateEffective", "LastCheckTime"]
columnValues = [name, plotType, plotName, result, dateEffective, lastCheckTime]

return self._getRPC().addOrModify("AccountingCache", prepareDict(columnNames, columnValues))

def deleteAccountingCache(
self, name=None, plotType=None, plotName=None, result=None, dateEffective=None, lastCheckTime=None
):
"""
Deletes from AccountingCache all rows that match the parameters given.

:param str name: name of an individual of the grid topology
:param str plotType: the plotType name (e.g. 'Pilot')
:param str plotName: the plot name
:param str result: command result
:param datetime dateEffective: timestamp from which the result is effective
:param datetime lastCheckTime: timestamp setting last time the result was checked
:return: S_OK() || S_ERROR()
"""
columnNames = ["Name", "PlotType", "PlotName", "Result", "DateEffective", "LastCheckTime"]
columnValues = [name, plotType, plotName, result, dateEffective, lastCheckTime]

return self._getRPC().delete("AccountingCache", prepareDict(columnNames, columnValues))

# GGUSTicketsCache Methods ...................................................

def selectGGUSTicketsCache(
self, gocSite=None, link=None, openTickets=None, tickets=None, lastCheckTime=None, meta=None
):
Expand Down
104 changes: 9 additions & 95 deletions src/DIRAC/ResourceStatusSystem/Client/ResourceStatus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" ResourceStatus
"""ResourceStatus

Module that acts as a helper for knowing the status of a resource.
It takes care of switching between the CS and the RSS.
Expand All @@ -10,9 +10,7 @@
from datetime import datetime, timedelta
from time import sleep

from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI
from DIRAC.Core.Utilities import DErrno
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.Core.Utilities.DIRACSingleton import DIRACSingleton
from DIRAC.ResourceStatusSystem.Client.ResourceStatusClient import ResourceStatusClient
from DIRAC.ResourceStatusSystem.Utilities.InfoGetter import getPoliciesThatApply
Expand All @@ -36,17 +34,16 @@ def __init__(self):

cacheLifeTime = int(self.rssConfig.getConfigCache())

# RSSCache only affects the calls directed to RSS, if using the CS it is not used.
# RSSCache only affects the calls directed to RSS
self.rssCache = RSSCache(cacheLifeTime, self.__updateRssCache)

def getElementStatus(self, elementName, elementType, statusType=None, default=None, vO=None):
"""
Helper function, tries to get information from the RSS for the given
Element, otherwise, it gets it from the CS.
Helper function, tries to get information from the RSS for the given Element

:param elementName: name of the element or list of element names
:type elementName: str, list
:param elementType: type of the element (StorageElement, ComputingElement, FTS, Catalog)
:param elementType: type of the element (StorageElement, ComputingElement, FTS)
:type elementType: str
:param statusType: type of the status (meaningful only when elementType==StorageElement)
:type statusType: None, str, list
Expand All @@ -65,8 +62,6 @@ def getElementStatus(self, elementName, elementType, statusType=None, default=No
S_ERROR( xyz.. )
>>> getElementStatus('ThisIsAWrongName', 'StorageElement', 'WriteAccess')
S_ERROR( xyz.. )
>>> getElementStatus('A_file_catalog', 'FileCatalog')
S_OK( { 'A_file_catalog': { 'all': 'Active' } } } )
>>> getElementStatus('SE1', 'StorageElement', ['ReadAccess', 'WriteAccess'])
S_OK( { 'SE1': { 'ReadAccess': 'Banned' , 'WriteAccess': 'Active'} } } )
>>> getElementStatus('SE1', 'StorageElement')
Expand All @@ -79,7 +74,7 @@ def getElementStatus(self, elementName, elementType, statusType=None, default=No
'CE2': {'all': 'Probing'}}}
"""

allowedParameters = ["StorageElement", "ComputingElement", "FTS", "Catalog"]
allowedParameters = ["StorageElement", "ComputingElement", "FTS"]

if elementType not in allowedParameters:
return S_ERROR(f"{elementType} in not in the list of the allowed parameters: {allowedParameters}")
Expand All @@ -92,17 +87,15 @@ def getElementStatus(self, elementName, elementType, statusType=None, default=No
statusType = ["all"]
elif elementType == "FTS":
statusType = ["all"]
elif elementType == "Catalog":
statusType = ["all"]

return self.__getRSSElementStatus(elementName, elementType, statusType, vO)

def setElementStatus(self, elementName, elementType, statusType, status, reason=None, tokenOwner=None):
"""Tries set information in RSS and in CS.
"""Tries set information in RSS

:param elementName: name of the element
:type elementName: str
:param elementType: type of the element (StorageElement, ComputingElement, FTS, Catalog)
:param elementType: type of the element (StorageElement, ComputingElement, FTS)
:type elementType: str
:param statusType: type of the status (meaningful only when elementType==StorageElement)
:type statusType: str
Expand Down Expand Up @@ -159,7 +152,7 @@ def __getRSSElementStatus(self, elementName, elementType, statusType, vO):

:param elementName: name of the element or list of element names
:type elementName: str, list
:param elementType: type of the element (StorageElement, ComputingElement, FTS, Catalog)
:param elementType: type of the element (StorageElement, ComputingElement, FTS)
:type elementType: str
:param statusType: type of the status (meaningful only when elementType==StorageElement,
otherwise it is 'all' or ['all'])
Expand All @@ -173,53 +166,6 @@ def __getRSSElementStatus(self, elementName, elementType, statusType, vO):

return cacheMatch

def __getCSElementStatus(self, elementName, elementType, statusType, default):
"""Gets from the CS the Element status

:param elementName: name of the element
:type elementName: str
:param elementType: type of the element (StorageElement, ComputingElement, FTS, Catalog)
:type elementType: str
:param statusType: type of the status (meaningful only when elementType==StorageElement)
:type statusType: str, list
:param default: defult value
:type default: None, str
"""
cs_path = None
# DIRAC doesn't store the status of ComputingElements nor FTS in the CS, so here we can just return 'Active'
if elementType in ("ComputingElement", "FTS"):
return S_OK({elementName: {"all": "Active"}})

# If we are here it is because elementType is either 'StorageElement' or 'Catalog'
if elementType == "StorageElement":
cs_path = "/Resources/StorageElements"
elif elementType == "Catalog":
cs_path = "/Resources/FileCatalogs"
statusType = ["Status"]

if not isinstance(elementName, list):
elementName = [elementName]

if not isinstance(statusType, list):
statusType = [statusType]

result = {}
for element in elementName:
for sType in statusType:
# Look in standard location, 'Active' by default
res = gConfig.getValue(f"{cs_path}/{element}/{sType}", "Active")
result.setdefault(element, {})[sType] = res

if result:
return S_OK(result)

if default is not None:
defList = [[el, statusType, default] for el in elementName]
return S_OK(getDictFromList(defList))

_msg = "Element '%s', with statusType '%s' is unknown for CS."
return S_ERROR(DErrno.ERESUNK, _msg % (elementName, statusType))

def __setRSSElementStatus(self, elementName, elementType, statusType, status, reason, tokenOwner):
"""
Sets on the RSS the Elements status
Expand Down Expand Up @@ -254,38 +200,6 @@ def __setRSSElementStatus(self, elementName, elementType, statusType, status, re
# Release lock, no matter what.
self.rssCache.releaseLock()

def __setCSElementStatus(self, elementName, elementType, statusType, status):
"""
Sets on the CS the Elements status
"""
cs_path = None
# DIRAC doesn't store the status of ComputingElements nor FTS in the CS, so here we can just do nothing
if elementType in ("ComputingElement", "FTS"):
return S_OK()

# If we are here it is because elementType is either 'StorageElement' or 'Catalog'
statuses = self.rssConfig.getConfigStatusType(elementType)
if statusType not in statuses:
gLogger.error(f"{statusType} is not a valid statusType")
return S_ERROR(f"{statusType} is not a valid statusType: {statuses}")

if elementType == "StorageElement":
cs_path = "/Resources/StorageElements"
elif elementType == "Catalog":
cs_path = "/Resources/FileCatalogs"
# FIXME: This a probably outdated location (new one is in /Operations/[]/Services/Catalogs)
# but needs to be VO-aware
statusType = "Status"

csAPI = CSAPI()
csAPI.setOption(f"{cs_path}/{elementName}/{elementType}/{statusType}", status)

res = csAPI.commitChanges()
if not res["OK"]:
gLogger.warn(f"CS: {res['Message']}")

return res

def isStorageElementAlwaysBanned(self, seName, statusType):
"""Checks if the AlwaysBanned policy is applied to the SE given as parameter

Expand Down
6 changes: 3 additions & 3 deletions src/DIRAC/ResourceStatusSystem/Client/SiteStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self):

cacheLifeTime = int(self.rssConfig.getConfigCache())

# RSSCache only affects the calls directed to RSS, if using the CS it is not used.
# RSSCache only affects the calls directed to RSS
self.rssCache = RSSCache(cacheLifeTime, self.__updateRssCache)

def __updateRssCache(self):
Expand Down Expand Up @@ -159,7 +159,7 @@ def getSites(self, siteState="Active"):
S_OK( ['test1.test1.uk', 'test3.test3.org'] )
>>> siteStatus.getSites( 'Banned' )
S_OK( ['test0.test0.uk', ... ] )
>>> siteStatus.getSites( 'All' )
>>> siteStatus.getSites( 'all' )
S_OK( ['test1.test1.uk', 'test3.test3.org', 'test4.test4.org', 'test5.test5.org'...] )
>>> siteStatus.getSites( None )
S_ERROR( ... )
Expand All @@ -180,7 +180,7 @@ def getSites(self, siteState="Active"):
if not siteStatusDictRes["Value"]:
return S_OK([])

if siteState.capitalize() == "All":
if siteState.capitalize() == "all":
# if no siteState is set return everything
siteList = list(siteStatusDictRes["Value"])

Expand Down
Loading
Loading