Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions DHIS2/notify_expiration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Notify expiration
This script searches for users that will have their password expired, log it and cand send a simplified notification to a webhook URL.

# Usage
Run the script with the following parameters:

python3 notify_expiration.py --config <file.json> (--debug)
Parameters:
--config <file>: Mandatory configuration file where the URL, credentials and other parameters are configured
--debug: Optional parameter to ask for debugging logs, to ensure proper load of the configuration file and all details regarding data gathered and filtered.

# Config file
It accepts the following attributes as a json:
- `server`: Mandatory. The url of the DHIS2 instance. Should end with a slash `/`, but it will append it if not present.
- `user`: Mandatory. The user needed to authenticate to the instance.
- `password`: Mandatory. The password for the authentication.
- `filterGroup`: Optional. Defaults to `None`. An array of userGroup names to be considered as the only ones to evaluate. It allows to restrict the result to those who belong to any of these groups instead of the whole DHIS instance.
- `months_expiry`: Optional. Defaults to `18`. Should reflect the actual password policy of the instance. It does not retrieve the value from the instance as this allows to evaluate instances without an active password expiration policy.
- `how_many_days`: Optional. Defaults to `7`. As this script was conceived to be run periodically notifying which users will have their password expiring in the current week.
- `only_enabled`: Optional. Defaults to `true`. When set to `true`, restricts the evaluation to enabled users. When set to false it will evaluate both enabled and disable users.
- `webhook_url`: Optional. Defaults to `""`. If empty, will not send any notification and will only present the data directly in the console. If set to an URL, it will use it to notify the list of users (with expiration date and if they are disabled). This endpoint should expect to receive a json with 2 parameters: `text` and `dest`. `text` will be compossed with a `title` in bold followed by the content of the notification. `dest` should be used to determine the `destination` of the notification.
- `title`: Optional. Defaults to `"DHIS2 Password Expiration Notification"`. Allows to set a custom title to the notification.
- `destination`: Optional. Defaults to `"sysadmin"`. Allows the webhook URL to select the correct destination of the notification.
- `http_proxy`: Optional. If not present, will try to use an environmental variable of the same name. If present will be used for the webhook notification.
- `https_proxy`: Optional. If not present, will try to use an environmental variable of the same name. If present will be used for the webhook notification.

# How It Works
Parses config file.
Sends a GET request to retrieve the list of all users and some attributes needed to determine when the user is going to expire
Displays what users are in the expected range of dates and optionally sends a notification via webhook, as well as details of it was successful.

# Requirements
Python 3
requests module (pip install requests)
8 changes: 8 additions & 0 deletions DHIS2/notify_expiration/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"server": "https://domain/dhis2-instance/",
"user": "USER",
"password": "PASSWORD",
"months_expiry": 18,
"how_many_days": 7,
"only_enabled": false
}
157 changes: 157 additions & 0 deletions DHIS2/notify_expiration/notify_expiration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import json
import sys
import requests
from requests.auth import HTTPBasicAuth
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import argparse
import os
import logging

class DHIS2Monitor:
def __init__(self, config):
self.logger = logging.getLogger(__name__)

self.server = config.get('server')
if self.server[-1:] != "/":
self.server = self.server + "/"
self.logger.debug(f"Adjusted base server URL to: {self.server}")

self.auth = HTTPBasicAuth(config.get('user'), config.get('password'))

self.filterGroup = config.get('filterGroup')
if self.filterGroup:
self.logger.debug(f"Filtering users by groups: {self.filterGroup}")
else:
self.logger.debug("No user group filtering applied.")
months_expiry = config.get('months_expiry', 18)
how_many_days = config.get('how_many_days', 7)
self.expiry_beginning = (datetime.now() - relativedelta(months=months_expiry)).replace(hour=0, minute=0, second=0, microsecond=0)
self.expiry_end = (self.expiry_beginning + relativedelta(days=how_many_days)).replace(hour=23, minute=59, second=59, microsecond=0)
self.logger.debug(f"Selecting dates between {self.expiry_beginning} and {self.expiry_end}")
self.only_enabled = config.get('only_enabled', True)
if self.only_enabled:
self.logger.debug("Filtering only enabled users")
else:
self.logger.debug("All users will be considered (enabled and disabled)")
self.webhook_url = config.get('webhook_url', None)
if self.webhook_url:
self.logger.debug("Webhook URL configured, notifications will be sent.")
self.http_proxy = config.get('http_proxy', os.getenv("http_proxy", ""))
self.https_proxy = config.get('https_proxy', os.getenv("https_proxy", ""))
if self.http_proxy or self.https_proxy:
self.logger.debug(f"Using proxies: HTTP: {self.http_proxy} / HTTPS: {self.https_proxy}")
else:
self.logger.debug("No proxies configured.")
self.title = config.get('title', "DHIS2 Password Expiration Notification")
self.destination = config.get('destination', "sysadmin")
else:
self.logger.debug("No webhook URL configured, notifications will not be sent.")

def request(self, url):
response = requests.get(url, auth=self.auth)
return response

def parse_dt(self, value=None):
"""Parses a datetime string avoiding milliseconds."""
if not value:
return None
value = value.split(".")[0]
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S")

def get_users_data(self):
"""Fetches the list of users and their userGroups."""
return self.request(f'{self.server}api/users?fields=id,username,userGroups[name],created,passwordLastUpdated,disabled&paging=false')

def filter_users(self, users):
"""Filters out users based on groups, whether the user is disabled and the password expiration time."""
filtered = []
for u in users:
if self.only_enabled and u.get("disabled", False):
continue
if self.filterGroup:
group_names = [g["name"] for g in u.get("userGroups", []) if "name" in g]
if not set(group_names) & set(self.filterGroup):
continue
pwdlast_dt = self.parse_dt(u.get("passwordLastUpdated"))
created_dt = self.parse_dt(u.get("created"))
if pwdlast_dt:
if not (self.expiry_beginning <= pwdlast_dt <= self.expiry_end):
continue
last_change = pwdlast_dt
else:
if not (created_dt and self.expiry_beginning <= created_dt <= self.expiry_end):
continue
last_change = created_dt

days_since = int((datetime.now() - last_change).total_seconds() / 86400)

filtered.append({
"id": u["id"],
"username": u["username"],
"days_since_change": days_since,
"last_change": (pwdlast_dt or created_dt).isoformat(),
"disabled": u.get("disabled")
})

return sorted(filtered, key=lambda x: x["username"])

def handle_api_error(self, response):
self.logger.error(f"API request failed: {response.status_code} - {response.headers} = {response.text[:500]}")
sys.exit(0)

def send_notification(self, content):
proxies = {
"http": self.http_proxy,
"https": self.https_proxy
}

payload = {"text": f"**{self.title}**\n{content}", "dest": f"{self.destination}"}
try:
response = requests.post(self.webhook_url, json=payload, headers={"Content-Type": "application/json"}, proxies=proxies, verify=True)
response.raise_for_status()
self.logger.info(f"[OK] Notification sent: {response.status_code}")
except requests.RequestException as e:
self.logger.error(f"Failed to send notification: {e}")


def main(config_path, debug=False):
logging.basicConfig(
level=logging.INFO if not debug else logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s"
)

print("Starting at: " + datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))

with open(config_path, 'r') as conf_file:
config_data = json.load(conf_file)

monitor = DHIS2Monitor(config_data)

response = monitor.get_users_data()

if response.status_code != 200:
monitor.handle_api_error(response)

monitor.logger.debug(f"Found {len(response.json().get('users', []))} users in total. Checking for expiring passwords...")

data_filtered = monitor.filter_users(response.json()["users"])

if data_filtered:
monitor.logger.info(f"Found {len(data_filtered)} users with expiring passwords during the current week:")
monitor.logger.info(json.dumps(data_filtered, indent=4))
if monitor.webhook_url:
content = f"The following {len(data_filtered)} users have passwords that will expire during the current week:\n"
for user in data_filtered:
content += f"- {user['username']} ({user['last_change'][:10]})" + (f" [Disabled]" if user['disabled'] else "") + "\n"
monitor.send_notification(content)
else:
monitor.logger.info("No users found with expiring passwords.")

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="DHIS2 User password expiration notification script")
parser.add_argument('--config', required=True, help="Config file path")
parser.add_argument('--debug', action="store_true", help="Enable debug logging")
args = parser.parse_args()
main(args.config, args.debug)