Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions ontrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import fnmatch
import functools
import grp
import html
import logging
import os
import pathlib
Expand Down Expand Up @@ -687,6 +688,207 @@ def print_report(report_data: dict) -> None:
)


_CSS_TABLE = "border-collapse: collapse; font-family: sans-serif;"
_CSS_TH = (
"border: 1px solid #ccc; padding: 6px 12px; "
"background-color: #f0f0f0; text-align: left;"
)
_CSS_TD = "border: 1px solid #ccc; padding: 6px 12px;"


def _th(text: str) -> str:
"""Return an HTML ``<th>`` element string with standard table heading style.

Args:
text: Cell text; HTML-escaped before insertion.
"""
return f'<th style="{_CSS_TH}">{html.escape(str(text))}</th>'


def _td(value: object) -> str:
"""Return an HTML ``<td>`` element string with standard table cell style.

Args:
value: Cell value; converted to ``str`` and HTML-escaped before insertion.
"""
return f'<td style="{_CSS_TD}">{html.escape(str(value))}</td>'


def _collect_meta_keys(entries: list[dict]) -> list[str]:
"""Return an ordered list of metadata field names found across *entries*.

Fields listed in :data:`_REQUIRED_METADATA_FIELDS` appear first (in their
defined order), followed by any additional keys in the order they are first
encountered while iterating over *entries*.

Args:
entries: List of directory entry dicts as returned by
:func:`_build_directory_entry`.
"""
seen: set[str] = set()
keys: list[str] = []
# Required fields come first.
for field in _REQUIRED_METADATA_FIELDS:
for entry in entries:
meta = entry.get("metadata") or {}
if field in meta and field not in seen:
keys.append(field)
seen.add(field)
break
# Remaining fields in encounter order.
for entry in entries:
meta = entry.get("metadata") or {}
for key in meta:
if key not in seen:
keys.append(key)
seen.add(key)
return keys


def _entry_to_html_row(
entry: dict,
has_groups: bool,
has_sizes: bool,
meta_keys: list[str],
) -> str:
"""Return an HTML ``<tr>`` string for a single directory *entry*.

Args:
entry: Directory entry dict as returned by :func:`_build_directory_entry`.
has_groups: Whether a Groups column is present in the table.
has_sizes: Whether Files and Total Size columns are present.
meta_keys: Ordered list of metadata field names; one ``<td>`` is emitted
per key (empty string when the entry has no value for that key).

Returns:
A ``<tr>...</tr>`` HTML string with all cells inline-styled.
"""
cells = [
_td(entry.get("directory", "")),
_td(entry.get("username", "")),
]
if has_groups:
groups = entry.get("groups") or []
cells.append(_td(", ".join(groups)))
if has_sizes:
cells.append(_td(entry.get("file_count", "")))
cells.append(_td(entry.get("total_size_human", "")))
cells.append(_td("Yes" if entry.get("on_track") else "No"))
meta = entry.get("metadata") or {}
for key in meta_keys:
cells.append(_td(meta.get(key, "")))
return " <tr>" + "".join(cells) + "</tr>"


def _print_html_entries(entries: list[dict]) -> None:
"""Print an HTML table of directory *entries* to stdout.

The table uses inline CSS for compatibility with email clients. Optional
columns (Groups, Files, Total Size) are included only when at least one
entry carries that data. Any metadata fields found across all entries are
appended as additional columns.

Args:
entries: List of directory entry dicts as returned by
:func:`_build_directory_entry`.
"""
has_groups = any("groups" in e for e in entries)
has_sizes = any("file_count" in e for e in entries)
meta_keys = _collect_meta_keys(entries)

headers = ["Directory", "Username"]
if has_groups:
headers.append("Groups")
if has_sizes:
headers.extend(["Files", "Total Size"])
headers.append("On Track")
# capitalize() matches the existing plain-text _print_directory_entry behaviour
# (first letter upper, rest lower). This is a display convention — metadata
# keys from ontrack.yml are not validated here.
headers.extend(k.capitalize() for k in meta_keys)

print(f'<table style="{_CSS_TABLE}">')
print(" <thead>")
print(" <tr>" + "".join(_th(h) for h in headers) + "</tr>")
print(" </thead>")
print(" <tbody>")
for entry in entries:
print(
_entry_to_html_row(
entry, has_groups=has_groups, has_sizes=has_sizes, meta_keys=meta_keys
)
)
print(" </tbody>")
print("</table>")


def _print_html_report(report_data: dict) -> None:
"""Print on-track statistics as two HTML tables to stdout.

Outputs a per-track counts table followed by a per-user on-track share
table (with a total average row appended at the bottom). Both tables use
inline CSS for email-client compatibility.

Args:
report_data: A dict as returned by :func:`compute_report`.
"""
# --- per-track table ---
per_track: dict[str | None, int] = report_data.get("per_track", {})
named_tracks = sorted(t for t in per_track if t is not None)

print(f'<table style="{_CSS_TABLE}">')
print(" <thead>")
print(" <tr>" + _th("Track") + _th("Count") + "</tr>")
print(" </thead>")
print(" <tbody>")
for track in named_tracks:
print(" <tr>" + _td(track) + _td(per_track[track]) + "</tr>")
if None in per_track:
print(" <tr>" + _td("(untracked)") + _td(per_track[None]) + "</tr>")
print(" </tbody>")
print("</table>")

print()

# --- per-user table ---
per_user: dict[str, dict] = report_data["per_user"]
avg_pct = f"{report_data['average_share'] * 100:.1f}%"

print(f'<table style="{_CSS_TABLE}">')
print(" <thead>")
print(
" <tr>"
+ _th("Username")
+ _th("On Track")
+ _th("Total")
+ _th("Share")
+ "</tr>"
)
print(" </thead>")
print(" <tbody>")
for username in sorted(per_user):
stats = per_user[username]
share_pct = f"{stats['share'] * 100:.1f}%"
print(
" <tr>"
+ _td(username)
+ _td(stats["on_track"])
+ _td(stats["total"])
+ _td(share_pct)
+ "</tr>"
)
print(
" <tr>"
+ _td("Total average")
+ _td(report_data["total_on_track"])
+ _td(report_data["total"])
+ _td(avg_pct)
+ "</tr>"
)
print(" </tbody>")
print("</table>")


def load_config(config_path: str) -> dict:
"""Load and return the YAML configuration file."""
with open(config_path, "r") as fh:
Expand Down Expand Up @@ -717,6 +919,7 @@ def main(
output: str | None = None,
report: bool = False,
find: str | None = None,
html_output: bool = False,
) -> None:
"""Run ontrack with the given options.

Expand All @@ -733,6 +936,9 @@ def main(
file; otherwise they are printed to stdout.
find: Optional exact-match filter. Only entries containing at least
one output field whose value exactly matches this string are kept.
html_output: When ``True``, render the report as an HTML table printed
to stdout instead of the default plain-text format. Ignored when
*output* is also provided (YAML to file takes precedence).
"""
config = load_config(config_path)
paths: list[str] = config.get("paths", [])
Expand Down Expand Up @@ -801,6 +1007,8 @@ def main(
with open(output, "w") as fh:
yaml.dump(report_data, fh, default_flow_style=False, allow_unicode=True)
logger.info("Report written to %s", output)
elif html_output:
_print_html_report(report_data)
else:
print_report(report_data)
elif output is not None:
Expand All @@ -819,6 +1027,20 @@ def main(
with open(output, "w") as fh:
yaml.dump(results, fh, default_flow_style=False, allow_unicode=True)
logger.info("Report written to %s", output)
elif html_output:
entries = []
for path in iterator:
entry = _build_directory_entry(
path,
groups=groups,
light=light,
show_progress=progress,
ignore_patterns=ignore_patterns,
valid_tracks=valid_tracks,
)
if entry is not None and _entry_matches_find(entry, find):
entries.append(entry)
_print_html_entries(entries)
else:
for path in iterator:
entry = _build_directory_entry(
Expand Down Expand Up @@ -876,6 +1098,8 @@ def cli() -> None:
" %(prog)s --config ontrack.config --groups mygroup --light\n"
" %(prog)s --config ontrack.config --groups mygroup --report\n"
" %(prog)s --config ontrack.config --output report.yaml\n"
" %(prog)s --config ontrack.config --html\n"
" %(prog)s --config ontrack.config --groups mygroup --report --html\n"
" %(prog)s --config ontrack.config --find alice\n"
" %(prog)s --config ontrack.config --progress\n"
),
Expand Down Expand Up @@ -937,6 +1161,16 @@ def cli() -> None:
"(e.g. username, track name, Yes/No on-track status)."
),
)
parser.add_argument(
"--html",
action="store_true",
default=False,
help=(
"Render the report as an HTML table printed to stdout instead of the "
"default plain-text format. Produces email-friendly output that looks "
"correct with any font. Ignored when --output is also given."
),
)
args = parser.parse_args()
if not sys.argv[1:]:
parser.print_help()
Expand All @@ -950,6 +1184,7 @@ def cli() -> None:
output=args.output,
report=args.report,
find=args.find,
html_output=args.html,
)


Expand Down
Loading
Loading