Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,33 @@ misconfigured range specs in CI. Use `--allow-empty` to exit 0 instead:
commit-guard --range origin/main..HEAD --allow-empty
```

### Machine-readable output

Use `--output jsonl` to emit one JSON line per commit to stdout instead of the
default human-readable text on stderr:

```bash
commit-guard --range origin/main..HEAD --output jsonl
```

Each line is a JSON object:

```json
{
"sha": "abc1234...",
"subject": "feat: add thing",
"ok": false,
"results": [{"check": "body", "level": "error", "message": "missing body"}]
}
```

`sha` is `null` when reading from a file or stdin. `results` is empty when all
checks pass. Pipe to `jq` for filtering:

```bash
commit-guard --range origin/main..HEAD --output jsonl | jq 'select(.ok == false)'
```

### GitHub Actions

```yaml
Expand Down
110 changes: 80 additions & 30 deletions src/git_commit_guard/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import re
import subprocess
import sys
Expand Down Expand Up @@ -53,6 +54,11 @@ class Check(StrEnum):
ALL_CHECKS = frozenset(Check.__members__.values())


class OutputFormat(StrEnum):
TEXT = "text"
JSONL = "jsonl"


def _load_config(start=None):
start = start or Path.cwd()
for directory in [start, *start.parents]:
Expand Down Expand Up @@ -87,18 +93,18 @@ class Level(StrEnum):
class Result:
errors: list = field(default_factory=list)

def error(self, msg):
self.errors.append((Level.ERROR, msg))
def error(self, msg, check=None):
self.errors.append((check, Level.ERROR, msg))

def warn(self, msg):
self.errors.append((Level.WARN, msg))
def warn(self, msg, check=None):
self.errors.append((check, Level.WARN, msg))

def info(self, msg):
self.errors.append((Level.INFO, msg))
def info(self, msg, check=None):
self.errors.append((check, Level.INFO, msg))

@property
def ok(self):
return not any(lvl == Level.ERROR for lvl, _ in self.errors)
return not any(lvl == Level.ERROR for _, lvl, _ in self.errors)


def _ensure_nltk_data():
Expand Down Expand Up @@ -132,27 +138,35 @@ def check_subject( # noqa: PLR0913 Too many arguments in function definition (7
):
m = SUBJECT_RE.match(line)
if not m:
result.error(f"subject does not match 'type(scope): description': {line}")
result.error(
f"subject does not match 'type(scope): description': {line}",
check=Check.SUBJECT,
)
return None

if m.group("type") not in allowed_types:
result.error(f"unknown type: {m.group('type')}")
result.error(f"unknown type: {m.group('type')}", check=Check.SUBJECT)

scope = m.group("scope")
if require_scope and scope is None:
result.error("scope is required")
result.error("scope is required", check=Check.SUBJECT)
if allowed_scopes and scope is not None and scope not in allowed_scopes:
result.error(f"unknown scope: {scope}")
result.error(f"unknown scope: {scope}", check=Check.SUBJECT)

desc = m.group("desc")
if desc[0].isupper():
result.error("description must not start with uppercase")
result.error("description must not start with uppercase", check=Check.SUBJECT)
if desc.endswith("."):
result.error("description must not end with period")
result.error("description must not end with period", check=Check.SUBJECT)
if len(line) > max_subject_length:
result.error(f"subject too long: {len(line)} > {max_subject_length}")
result.error(
f"subject too long: {len(line)} > {max_subject_length}", check=Check.SUBJECT
)
if min_description_length > 0 and len(desc) < min_description_length:
result.error(f"description too short: {len(desc)} < {min_description_length}")
result.error(
f"description too short: {len(desc)} < {min_description_length}",
check=Check.SUBJECT,
)
return desc


Expand All @@ -163,12 +177,16 @@ def check_imperative(desc, result):
return
first = tokens[0]
if _NON_IMPERATIVE_SUFFIX_RE.search(first):
result.error(f"expected imperative verb, got '{first}' (non-imperative suffix)")
result.error(
f"expected imperative verb, got '{first}' (non-imperative suffix)",
check=Check.IMPERATIVE,
)
return
base = wordnet.morphy(first, wordnet.VERB)
if base is not None and base != first:
result.error(
f"expected imperative verb, got '{first}' (inflected form of '{base}')"
f"expected imperative verb, got '{first}' (inflected form of '{base}')",
check=Check.IMPERATIVE,
)
return
tagged = nltk.pos_tag(["to", *tokens])
Expand All @@ -177,23 +195,24 @@ def check_imperative(desc, result):
return
result.error(
f"expected imperative verb, got '{tagged[1][0]}' (POS={tagged[1][1]})",
check=Check.IMPERATIVE,
)


def check_body(lines, result):
if len(lines) < 3: # noqa: PLR2004
result.error("missing body")
result.error("missing body", check=Check.BODY)
return
if lines[1].strip():
result.error("missing blank line between subject and body")
result.error("missing blank line between subject and body", check=Check.BODY)
body_lines = [ln for ln in lines[2:] if not _TRAILER_RE.match(ln)]
if not any(ln.strip() for ln in body_lines):
result.error("missing body")
result.error("missing body", check=Check.BODY)


def check_signed_off(message, result):
if not SIGNED_OFF_RE.search(message):
result.error("missing 'Signed-off-by' trailer")
result.error("missing 'Signed-off-by' trailer", check=Check.SIGNED_OFF)


def check_required_trailers(message, required, result):
Expand All @@ -212,12 +231,12 @@ def check_signature(rev, result):
timeout=GIT_TIMEOUT,
)
if proc.returncode != 0:
result.error("commit is not signed (GPG/SSH)")
result.error("commit is not signed (GPG/SSH)", check=Check.SIGNATURE)
return

output = proc.stderr.lower()
sig_type = "SSH" if "ssh" in output else "GPG"
result.info(f"signature type: {sig_type}")
result.info(f"signature type: {sig_type}", check=Check.SIGNATURE)


def _get_message(rev):
Expand Down Expand Up @@ -266,6 +285,7 @@ class Args:
allow_empty: bool
include_merges: bool
required_trailers: list
output: OutputFormat


def _resolve_enabled(args, config, parser):
Expand Down Expand Up @@ -409,6 +429,12 @@ def _parse_args():
default=False,
help="include merge commits when checking a range (default: excluded)",
)
parser.add_argument(
"--output",
choices=[f.value for f in OutputFormat],
default=OutputFormat.TEXT,
help="output format: text (default) or jsonl",
)
args = parser.parse_args()
config = _load_config()
enabled = _resolve_enabled(args, config, parser)
Expand Down Expand Up @@ -454,12 +480,28 @@ def _parse_args():
allow_empty=args.allow_empty,
include_merges=args.include_merges,
required_trailers=required_trailers,
output=OutputFormat(args.output),
)


def _report(result):
for level, msg in result.errors:
sys.stderr.write(f" {PREFIXES[level]} {msg}\n")
def _report_jsonl(result, sha, subject):
record = {
"sha": sha,
"subject": subject,
"ok": result.ok,
"results": [
{"check": check, "level": str(level), "message": msg}
for check, level, msg in result.errors
],
}
sys.stdout.write(json.dumps(record) + "\n")
return 0 if result.ok else 1


def _report_text(result):
for check, level, msg in result.errors:
prefix = f"[{check}] " if check else ""
sys.stderr.write(f" {PREFIXES[level]} {prefix}{msg}\n")

if result.ok:
sys.stderr.write(" \033[32m✓\033[0m all checks passed\n")
Expand Down Expand Up @@ -507,13 +549,21 @@ def main():
failed = False
for rev in revs:
message = _strip_comments(_get_message(rev))
sys.stderr.write(f"{rev[:7]} {message.split('\n')[0]}\n")
subject = message.split("\n")[0]
result = Result()
_run_checks(args, rev, message, result)
if _report(result) != 0:
failed = True
if args.output == OutputFormat.JSONL:
if _report_jsonl(result, rev, subject) != 0:
failed = True
else:
sys.stderr.write(f"{rev[:7]} {subject}\n")
if _report_text(result) != 0:
failed = True
return 1 if failed else 0

subject = args.message.split("\n")[0]
result = Result()
_run_checks(args, args.rev, args.message, result)
return _report(result)
if args.output == OutputFormat.JSONL:
return _report_jsonl(result, args.rev, subject)
return _report_text(result)
Loading
Loading