-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_analyzer.py
More file actions
174 lines (141 loc) · 7.05 KB
/
log_analyzer.py
File metadata and controls
174 lines (141 loc) · 7.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# Import necessary modules
from collections import defaultdict
import sys
from datetime import datetime
# Define constants for log analysis parameters
LOG_FILE_NAME = "logs.txt"
STATUS_ANOMALY = "ANOMALY"
STATUS_OK = "OK"
# Define the brute-force detection parameters
BRUTE_FORCE_THRESHOLD_FAILURES = 3
BRUTE_FORCE_TIME_WINDOW_SECONDS = 60
# --- Core Parsing Function (Fixed) ---
def parse_log_entry(log_line):
"""
Parses a raw log line into a structured dictionary, assigns a status,
and parses the timestamp into a datetime object for time-based analysis.
Expected format: YYYY-MM-DD HH:MM:SS IP_ADDRESS STATUS_CODE MESSAGE
The parser uses split(None, 4) to break the line into 5 parts:
Date, Time, IP, Status_Code, and the rest as the Message.
"""
# Use split() to break the line by whitespace, limiting the split count
# to separate the fixed fields from the rest of the message.
parts = log_line.strip().split(None, 4)
if len(parts) < 5:
# Line is malformed or empty, skip it.
return None
try:
# 1. Combine Date and Time into a single timestamp string
timestamp_raw = f"{parts[0]} {parts[1]}"
# 2. Parse the timestamp into a datetime object for time-based calculations
timestamp_dt = datetime.strptime(timestamp_raw, '%Y-%m-%d %H:%M:%S')
except ValueError:
# Time parsing failed, skip the entry
print(f"Warning: Could not parse timestamp in line: {log_line.strip()}")
return None
# The log structure seems to combine event type and status into one token (e.g., 'AUTH_FAIL')
# We will use the combined token for 'event_status' and derive 'event_type' from it.
ip_address = parts[2]
event_status = parts[3].upper() # e.g., AUTH_FAIL, SYSTEM_ERROR, AUTH_SUCCESS
message = parts[4]
# Try to derive a basic event type (e.g., 'AUTH' from 'AUTH_FAIL')
event_type = event_status.split('_')[0] if '_' in event_status else event_status
# --- Anomaly Classification (PHASE 1) ---
# Check for known anomalous statuses
is_anomaly = any(s in event_status for s in ["FAIL", "ERROR", "DENIED", "CRITICAL"])
status_tag = STATUS_ANOMALY if is_anomaly else STATUS_OK
return {
'log_line': log_line.strip(),
'timestamp_raw': timestamp_raw,
'datetime': timestamp_dt,
'ip_address': ip_address,
'event_type': event_type,
'event_status': event_status,
'message': message,
'status_tag': status_tag
}
# --- Data Grouping Functions ---
def group_anomalies_by_ip(anomalies):
"""Groups anomaly events into a dictionary keyed by IP address."""
ip_anomalies = defaultdict(list)
for event in anomalies:
ip_anomalies[event['ip_address']].append(event)
return dict(ip_anomalies)
def detect_brute_force(ip_anomalies):
"""
Detects potential brute-force attacks by checking if an IP had
more than a threshold of FAIL events within the time window.
"""
brute_force_ips = set()
for ip, events in ip_anomalies.items():
# Only look at AUTH_FAIL events for brute-force detection
fail_events = sorted([
e for e in events if 'AUTH_FAIL' in e['event_status']
], key=lambda x: x['datetime'])
# Slide a window across the failed events
if len(fail_events) >= BRUTE_FORCE_THRESHOLD_FAILURES:
# Iterate through all combinations of the threshold number of failures
for i in range(len(fail_events) - BRUTE_FORCE_THRESHOLD_FAILURES + 1):
# The start time of the window
start_time = fail_events[i]['datetime']
# The end time of the cluster of failures
end_event_time = fail_events[i + BRUTE_FORCE_THRESHOLD_FAILURES - 1]['datetime']
# Calculate the time difference between the first and last failure in the cluster
time_diff = (end_event_time - start_time).total_seconds()
if time_diff <= BRUTE_FORCE_TIME_WINDOW_SECONDS:
brute_force_ips.add(ip)
# We found a match, no need to check other windows for this IP
break
return list(brute_force_ips)
# --- Main Execution Logic ---
def main():
"""Main function to run the log analysis."""
try:
# PHASE 0: Load and Parse Data
with open(LOG_FILE_NAME, 'r') as f:
log_lines = f.readlines()
except FileNotFoundError:
print(f"Error: Log file '{LOG_FILE_NAME}' not found.")
sys.exit(1)
structured_data = []
anomalies_only = []
for line in log_lines:
entry = parse_log_entry(line)
if entry:
structured_data.append(entry)
if entry['status_tag'] == STATUS_ANOMALY:
anomalies_only.append(entry)
total_anomalies = len(anomalies_only)
# ----------------------------------------------------
# PHASE 1: Summary Report
# ----------------------------------------------------
print("--- Analysis Summary ---")
print(f"Total Valid Entries Processed: {len(structured_data)}")
print(f"Total Anomalies Found (FAIL/ERROR/Denied): {total_anomalies}")
# ----------------------------------------------------
# PHASE 2: Critical Anomalies Report (Grouped by IP)
# ----------------------------------------------------
ip_anomalies = group_anomalies_by_ip(anomalies_only)
brute_force_list = detect_brute_force(ip_anomalies)
print("\n--- IP Threat Report (Anomalies Grouped) ---")
if ip_anomalies:
# Sort IPs by the number of incidents (descending)
sorted_ip_anomalies = sorted(ip_anomalies.items(), key=lambda item: len(item[1]), reverse=True)
for ip, events in sorted_ip_anomalies:
# Use the correct tag based on detection result
tag = "[BRUTE-FORCE ATTACK]" if ip in brute_force_list else "[CRITICAL IP]"
# Check if all events for this IP are non-AUTH errors (like Disk Full)
# This helps differentiate pure systemic issues from authentication issues
is_system_error_only = all('SYSTEM_ERROR' in e['event_status'] for e in events)
if is_system_error_only:
tag = "[SYSTEM CRITICAL ERROR]"
# Sort individual events by time for clean report reading
sorted_events_for_report = sorted(events, key=lambda x: x.get('datetime') or datetime.min)
print(f"\n{tag}: {ip} ({len(events)} Incidents)")
for event in sorted_events_for_report:
# Only show timestamp, event type/status, and message in the report
print(f" -> {event['timestamp_raw']} | Event: {event['event_status']} | Message: {event['message']}")
else:
print("No critical anomalies detected.")
if __name__ == "__main__":
main()