Skip to content

Commit 2c19caf

Browse files
committed
test validation for splunk
1 parent b49e81a commit 2c19caf

2 files changed

Lines changed: 239 additions & 1 deletion

File tree

detections/splunk/defense-evasion/disable-script-block-logging.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ status: production
55
description: Detects registry modifications that disable PowerShell Script Block Logging.
66
author: Adam Ring
77
date: '2026-04-08'
8-
modified: ''
8+
modified: '2026-04-08'
99
platform: splunk
1010
query_language: spl
1111
logsource:
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
from pathlib import Path
2+
import sys
3+
import yaml
4+
5+
DETECTIONS_ROOT = Path("detections/splunk")
6+
7+
# Full schema for active Splunk detections
8+
REQUIRED_ACTIVE_FIELDS = [
9+
"title",
10+
"id",
11+
"source_id",
12+
"status",
13+
"description",
14+
"author",
15+
"date",
16+
"platform",
17+
"query_language",
18+
"logsource",
19+
"source_table",
20+
"search",
21+
"how_to_implement",
22+
"severity",
23+
"risk_score",
24+
"tactics",
25+
"techniques",
26+
"kill_chain_phases",
27+
"data_sources",
28+
"falsepositives",
29+
]
30+
31+
# Lighter schema for deprecated Splunk detections
32+
REQUIRED_DEPRECATED_FIELDS = [
33+
"title",
34+
"id",
35+
"status",
36+
"description",
37+
"author",
38+
"date",
39+
"logsource",
40+
]
41+
42+
ALLOWED_STATUS = {"experimental", "testing", "stable", "production", "deprecated"}
43+
ALLOWED_SEVERITY = {"low", "medium", "high", "critical"}
44+
ALLOWED_PLATFORM = {"splunk"}
45+
ALLOWED_QUERY_LANGUAGE = {"spl"}
46+
47+
ID_PREFIX = "SPLK-"
48+
SOURCE_ID_PREFIX = "SENT-"
49+
50+
51+
def load_yaml(path: Path):
52+
with path.open("r", encoding="utf-8") as f:
53+
return yaml.safe_load(f)
54+
55+
56+
def is_deprecated_rule(path: Path, data: dict) -> bool:
57+
if "deprecated" in path.parts:
58+
return True
59+
lifecycle = str(data.get("lifecycle", "")).strip().lower()
60+
status = str(data.get("status", "")).strip().lower()
61+
return lifecycle == "deprecated" or status == "deprecated"
62+
63+
64+
def validate_common_fields(path: Path, data: dict, deprecated: bool):
65+
errors = []
66+
67+
title = str(data.get("title", "")).strip()
68+
rule_id = str(data.get("id", "")).strip()
69+
status = str(data.get("status", "")).strip().lower()
70+
71+
if not title:
72+
errors.append(f"{path}: 'title' must not be empty")
73+
74+
if not rule_id:
75+
errors.append(f"{path}: 'id' must not be empty")
76+
elif not deprecated and not rule_id.startswith(ID_PREFIX):
77+
errors.append(f"{path}: 'id' should start with '{ID_PREFIX}'")
78+
79+
if status and status not in ALLOWED_STATUS:
80+
errors.append(
81+
f"{path}: invalid status '{data.get('status')}'. Allowed: {sorted(ALLOWED_STATUS)}"
82+
)
83+
84+
if "logsource" in data and not isinstance(data.get("logsource"), dict):
85+
errors.append(f"{path}: 'logsource' must be a dictionary")
86+
87+
return errors
88+
89+
90+
def validate_active_rule(path: Path, data: dict):
91+
errors = []
92+
93+
for field in REQUIRED_ACTIVE_FIELDS:
94+
if field not in data:
95+
errors.append(f"{path}: missing required field '{field}'")
96+
97+
if errors:
98+
return errors
99+
100+
errors.extend(validate_common_fields(path, data, deprecated=False))
101+
102+
platform = str(data.get("platform", "")).strip().lower()
103+
query_language = str(data.get("query_language", "")).strip().lower()
104+
severity = str(data.get("severity", "")).strip().lower()
105+
source_id = str(data.get("source_id", "")).strip()
106+
source_table = str(data.get("source_table", "")).strip()
107+
search = str(data.get("search", "")).strip()
108+
109+
if platform not in ALLOWED_PLATFORM:
110+
errors.append(
111+
f"{path}: invalid platform '{data.get('platform')}'. Allowed: {sorted(ALLOWED_PLATFORM)}"
112+
)
113+
114+
if query_language not in ALLOWED_QUERY_LANGUAGE:
115+
errors.append(
116+
f"{path}: invalid query_language '{data.get('query_language')}'. Allowed: {sorted(ALLOWED_QUERY_LANGUAGE)}"
117+
)
118+
119+
if severity not in ALLOWED_SEVERITY:
120+
errors.append(
121+
f"{path}: invalid severity '{data.get('severity')}'. Allowed: {sorted(ALLOWED_SEVERITY)}"
122+
)
123+
124+
if not source_id.startswith(SOURCE_ID_PREFIX):
125+
errors.append(f"{path}: 'source_id' should start with '{SOURCE_ID_PREFIX}'")
126+
127+
if not source_table:
128+
errors.append(f"{path}: 'source_table' must not be empty")
129+
130+
if not search:
131+
errors.append(f"{path}: 'search' must not be empty")
132+
133+
risk_score = data.get("risk_score")
134+
if not isinstance(risk_score, int):
135+
errors.append(f"{path}: 'risk_score' must be an integer")
136+
elif not 0 <= risk_score <= 100:
137+
errors.append(f"{path}: 'risk_score' must be between 0 and 100")
138+
139+
list_fields = [
140+
"how_to_implement",
141+
"tactics",
142+
"techniques",
143+
"kill_chain_phases",
144+
"data_sources",
145+
"falsepositives",
146+
]
147+
148+
for field in list_fields:
149+
if not isinstance(data.get(field), list):
150+
errors.append(f"{path}: '{field}' must be a list")
151+
152+
# Optional but useful sanity checks
153+
if "modified" in data and data.get("modified") in (None, ""):
154+
errors.append(f"{path}: 'modified' is present but empty")
155+
156+
return errors
157+
158+
159+
def validate_deprecated_rule(path: Path, data: dict):
160+
errors = []
161+
162+
for field in REQUIRED_DEPRECATED_FIELDS:
163+
if field not in data:
164+
errors.append(f"{path}: missing required field '{field}'")
165+
166+
if errors:
167+
return errors
168+
169+
errors.extend(validate_common_fields(path, data, deprecated=True))
170+
return errors
171+
172+
173+
def validate_file(path: Path):
174+
try:
175+
data = load_yaml(path)
176+
except Exception as exc:
177+
return [f"{path}: YAML parse error: {exc}"], None
178+
179+
if not isinstance(data, dict):
180+
return [f"{path}: root YAML object must be a dictionary"], None
181+
182+
deprecated = is_deprecated_rule(path, data)
183+
184+
if deprecated:
185+
return validate_deprecated_rule(path, data), data
186+
187+
return validate_active_rule(path, data), data
188+
189+
190+
def main():
191+
if not DETECTIONS_ROOT.exists():
192+
print(f"Detection root not found: {DETECTIONS_ROOT}")
193+
sys.exit(1)
194+
195+
detection_files = sorted(DETECTIONS_ROOT.rglob("*.yml")) + sorted(DETECTIONS_ROOT.rglob("*.yaml"))
196+
197+
if not detection_files:
198+
print("No Splunk detection YAML files found.")
199+
sys.exit(1)
200+
201+
all_errors = []
202+
seen_ids = {}
203+
seen_titles = {}
204+
205+
for path in detection_files:
206+
errors, data = validate_file(path)
207+
all_errors.extend(errors)
208+
209+
if isinstance(data, dict):
210+
deprecated = is_deprecated_rule(path, data)
211+
if not deprecated:
212+
rule_id = str(data.get("id", "")).strip()
213+
title = str(data.get("title", "")).strip().lower()
214+
215+
if rule_id:
216+
seen_ids.setdefault(rule_id, []).append(str(path))
217+
if title:
218+
seen_titles.setdefault(title, []).append(str(path))
219+
220+
for rule_id, paths in seen_ids.items():
221+
if len(paths) > 1:
222+
all_errors.append(f"Duplicate rule id '{rule_id}' found in: {paths}")
223+
224+
for title, paths in seen_titles.items():
225+
if len(paths) > 1:
226+
all_errors.append(f"Duplicate title '{title}' found in: {paths}")
227+
228+
if all_errors:
229+
print("Validation failed:\n")
230+
for err in all_errors:
231+
print(f"- {err}")
232+
sys.exit(1)
233+
234+
print(f"Validation successful. Checked {len(detection_files)} Splunk detection files.")
235+
236+
237+
if __name__ == "__main__":
238+
main()

0 commit comments

Comments
 (0)