Skip to content

Commit 73a620f

Browse files
Bugfix/audit-logs checkpoints (#169)
* added audit-logs command * remove format option * Added send-to command * added changelog * fix test * changed argument naming convention to singular * refactor search intervals * added format options * refactor * make begin date required * fix tests * added message when no results found and default header for table output * fix send-to when no results found * toggle py42 version * Fix integration test, removed output validation as made input dates dynamic * Add integration test for auditlogs * Fix style * added docs string and renamed variable * refactor: _send_to function * Rectify doc * Changed output fields for table format * add checkpointing to audit-logs commands * sort audit-log events * fix style failures * fix tests * fix moar tests * re-implement checkpointing with deduping by event hashes (which are now also stored in cursor) * use parent class methods since they're the same * update tests, fix style * fix missing mock logger * add docstring to dedupe function * style * add auditlog cursor to profile delete test * make response fixtures generators to match actual get_all behavior * clarify use_checkpoint value is checkpoint name not a boolean * add TestAuditLogCursorStore class * no need to return when writing * fix clear_checkpoint helptext * style * fix "timestamp w/ missing ms" bug and add test * style Co-authored-by: Kiran Chaudhary <kiran.chaudhary@code42.com>
1 parent 9d77217 commit 73a620f

File tree

6 files changed

+653
-63
lines changed

6 files changed

+653
-63
lines changed

src/code42cli/cmds/auditlogs.py

Lines changed: 130 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
import json
2-
from _collections import OrderedDict
1+
from collections import OrderedDict
2+
from datetime import datetime
3+
from datetime import timezone
34

45
import click
56

67
from code42cli.click_ext.groups import OrderedGroup
8+
from code42cli.cmds.search.cursor_store import AuditLogCursorStore
9+
from code42cli.cmds.search.options import BeginOption
710
from code42cli.date_helper import parse_max_timestamp
811
from code42cli.date_helper import parse_min_timestamp
912
from code42cli.logger import get_logger_for_server
@@ -14,10 +17,12 @@
1417
from code42cli.options import send_to_format_options
1518
from code42cli.options import server_options
1619
from code42cli.output_formats import OutputFormatter
20+
from code42cli.util import hash_event
1721
from code42cli.util import warn_interrupt
1822

1923
EVENT_KEY = "events"
2024
AUDIT_LOGS_KEYWORD = "audit-logs"
25+
AUDIT_LOG_TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
2126

2227
AUDIT_LOGS_DEFAULT_HEADER = OrderedDict()
2328
AUDIT_LOGS_DEFAULT_HEADER["timestamp"] = "Timestamp"
@@ -26,8 +31,7 @@
2631
AUDIT_LOGS_DEFAULT_HEADER["actorIpAddress"] = "ActorIpAddress"
2732
AUDIT_LOGS_DEFAULT_HEADER["userName"] = "AffectedUser"
2833
AUDIT_LOGS_DEFAULT_HEADER["userId"] = "AffectedUserUID"
29-
# AUDIT_LOGS_DEFAULT_HEADER["success"] = "Success"
30-
# AUDIT_LOGS_DEFAULT_HEADER["resultCount"] = "ResultCount"
34+
3135

3236
filter_option_usernames = click.option(
3337
"--username", required=False, help="Filter results by usernames.", multiple=True,
@@ -67,7 +71,7 @@ def filter_options(f):
6771
f,
6872
AUDIT_LOGS_KEYWORD,
6973
callback=lambda ctx, param, arg: parse_min_timestamp(arg),
70-
required=True,
74+
cls=BeginOption,
7175
)
7276
f = end_option(
7377
f, AUDIT_LOGS_KEYWORD, callback=lambda ctx, param, arg: parse_max_timestamp(arg)
@@ -81,16 +85,34 @@ def filter_options(f):
8185
return f
8286

8387

88+
checkpoint_option = click.option(
89+
"-c",
90+
"--use-checkpoint",
91+
metavar="checkpoint",
92+
help="Only get audit-log events that were not previously retrieved.",
93+
)
94+
95+
8496
@click.group(cls=OrderedGroup)
8597
@sdk_options(hidden=True)
8698
def audit_logs(state):
8799
"""Retrieve audit logs."""
88-
pass
100+
# store cursor getter on the group state so shared --begin option can use it in validation
101+
state.cursor_getter = _get_audit_log_cursor_store
102+
103+
104+
@audit_logs.command()
105+
@click.argument("checkpoint-name")
106+
@sdk_options()
107+
def clear_checkpoint(state, checkpoint_name):
108+
"""Remove the saved audit log checkpoint from `--use-checkpoint/-c` mode."""
109+
_get_audit_log_cursor_store(state.profile.name).delete(checkpoint_name)
89110

90111

91112
@audit_logs.command()
92113
@filter_options
93114
@format_option
115+
@checkpoint_option
94116
@sdk_options()
95117
def search(
96118
state,
@@ -103,11 +125,19 @@ def search(
103125
affected_user_id,
104126
affected_username,
105127
format,
128+
use_checkpoint,
106129
):
107130
"""Search audit logs."""
108-
_search(
131+
formatter = OutputFormatter(format, AUDIT_LOGS_DEFAULT_HEADER)
132+
cursor = _get_audit_log_cursor_store(state.profile.name)
133+
if use_checkpoint:
134+
checkpoint_name = use_checkpoint
135+
checkpoint = cursor.get(checkpoint_name)
136+
if checkpoint is not None:
137+
begin = checkpoint
138+
139+
events = _get_all_audit_log_events(
109140
state.sdk,
110-
format,
111141
begin_time=begin,
112142
end_time=end,
113143
event_types=event_type,
@@ -117,10 +147,25 @@ def search(
117147
affected_user_ids=affected_user_id,
118148
affected_usernames=affected_username,
119149
)
150+
if use_checkpoint:
151+
checkpoint_name = use_checkpoint
152+
events = list(
153+
_dedupe_checkpointed_events_and_store_updated_checkpoint(
154+
cursor, checkpoint_name, events
155+
)
156+
)
157+
if not events:
158+
click.echo("No results found.")
159+
return
160+
elif len(events) > 10:
161+
click.echo_via_pager(formatter.get_formatted_output(events))
162+
else:
163+
formatter.echo_formatted_list(events)
120164

121165

122166
@audit_logs.command()
123167
@filter_options
168+
@checkpoint_option
124169
@server_options
125170
@send_to_format_options
126171
@sdk_options()
@@ -137,13 +182,19 @@ def send_to(
137182
user_ip,
138183
affected_user_id,
139184
affected_username,
185+
use_checkpoint,
140186
):
141187
"""Send audit logs to the given server address."""
142-
_send_to(
188+
logger = get_logger_for_server(hostname, protocol, format)
189+
cursor = _get_audit_log_cursor_store(state.profile.name)
190+
if use_checkpoint:
191+
checkpoint_name = use_checkpoint
192+
checkpoint = cursor.get(checkpoint_name)
193+
if checkpoint is not None:
194+
begin = checkpoint
195+
196+
events = _get_all_audit_log_events(
143197
state.sdk,
144-
hostname,
145-
protocol,
146-
format,
147198
begin_time=begin,
148199
end_time=end,
149200
event_types=event_type,
@@ -153,43 +204,81 @@ def send_to(
153204
affected_user_ids=affected_user_id,
154205
affected_usernames=affected_username,
155206
)
207+
if use_checkpoint:
208+
checkpoint_name = use_checkpoint
209+
events = list(
210+
_dedupe_checkpointed_events_and_store_updated_checkpoint(
211+
cursor, checkpoint_name, events
212+
)
213+
)
214+
with warn_interrupt():
215+
event = None
216+
for event in events:
217+
logger.info(event)
218+
if event is None: # generator was empty
219+
click.echo("No results found.")
156220

157221

158-
def _search(sdk, format, **filter_args):
159-
160-
formatter = OutputFormatter(format, AUDIT_LOGS_DEFAULT_HEADER)
222+
def _get_all_audit_log_events(sdk, **filter_args):
161223
response_gen = sdk.auditlogs.get_all(**filter_args)
162-
163224
events = []
164225
try:
165-
for response in response_gen:
166-
response_dict = json.loads(response.text)
167-
if EVENT_KEY in response_dict:
168-
events.extend(response_dict.get(EVENT_KEY))
226+
responses = list(response_gen)
169227
except KeyError:
170228
# API endpoint (get_page) returns a response without events key when no records are found
171229
# e.g {"paginationRangeStartIndex": 10000, "paginationRangeEndIndex": 10000, "totalResultCount": 1593}
172-
pass
230+
# we can remove this check once PL-93211 is resolved and deployed.
231+
return events
173232

174-
event_count = len(events)
175-
if not event_count:
176-
click.echo("No results found.")
177-
elif event_count > 10:
178-
click.echo_via_pager(formatter.get_formatted_output(events))
179-
else:
180-
formatter.echo_formatted_list(events)
233+
for response in responses:
234+
if EVENT_KEY in response.data:
235+
response_events = response.data.get(EVENT_KEY)
236+
events.extend(response_events)
181237

238+
return sorted(events, key=lambda x: x.get("timestamp"))
182239

183-
def _send_to(sdk, hostname, protocol, format, **filter_args):
184-
logger = get_logger_for_server(hostname, protocol, format)
185-
with warn_interrupt():
186-
response_gen = sdk.auditlogs.get_all(**filter_args)
187-
try:
188-
for response in response_gen:
189-
if EVENT_KEY in response:
190-
for event in response[EVENT_KEY]:
191-
logger.info(event)
192-
else:
193-
logger.info("No results found.")
194-
except KeyError:
195-
pass
240+
241+
def _dedupe_checkpointed_events_and_store_updated_checkpoint(
242+
cursor, checkpoint_name, events
243+
):
244+
"""De-duplicates events across checkpointed runs. Since using the timestamp of the last event
245+
processed as the `--begin` time of the next run causes the last event to show up again in the
246+
next results, we hash the last event(s) of each run and store those hashes in the cursor to
247+
filter out on the next run. It's also possible that two events have the exact same timestamp, so
248+
`checkpoint_events` needs to be a list of hashes so we can filter out everything that's actually
249+
been processed.
250+
"""
251+
252+
checkpoint_events = cursor.get_events(checkpoint_name)
253+
new_timestamp = None
254+
new_events = []
255+
for event in events:
256+
event_hash = hash_event(event)
257+
if event_hash not in checkpoint_events:
258+
if event["timestamp"] != new_timestamp:
259+
new_timestamp = event["timestamp"]
260+
new_events.clear()
261+
new_events.append(event_hash)
262+
yield event
263+
ts = _parse_audit_log_timestamp_string_to_timestamp(new_timestamp)
264+
cursor.replace(checkpoint_name, ts)
265+
cursor.replace_events(checkpoint_name, new_events)
266+
267+
268+
def _get_audit_log_cursor_store(profile_name):
269+
return AuditLogCursorStore(profile_name)
270+
271+
272+
def _parse_audit_log_timestamp_string_to_timestamp(ts):
273+
# example: {"property": "bar", "timestamp": "2020-11-23T17:13:26.239647Z"}
274+
ts = ts[:-1]
275+
try:
276+
dt = datetime.strptime(ts, AUDIT_LOG_TIMESTAMP_FORMAT).replace(
277+
tzinfo=timezone.utc
278+
)
279+
except ValueError:
280+
ts = ts + ".0" # handle timestamps that are missing ms
281+
dt = datetime.strptime(ts, AUDIT_LOG_TIMESTAMP_FORMAT).replace(
282+
tzinfo=timezone.utc
283+
)
284+
return dt.timestamp()

src/code42cli/cmds/search/cursor_store.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import os
23
from os import path
34

@@ -37,7 +38,7 @@ def replace(self, cursor_name, new_timestamp):
3738
"""Replaces the last stored date observed timestamp with the given one."""
3839
location = path.join(self._dir_path, cursor_name)
3940
with open(location, "w") as checkpoint:
40-
return checkpoint.write(str(new_timestamp))
41+
checkpoint.write(str(new_timestamp))
4142

4243
def delete(self, cursor_name):
4344
"""Removes a single cursor from the store."""
@@ -75,13 +76,31 @@ def __init__(self, profile_name):
7576
super().__init__(dir_path)
7677

7778

78-
def get_file_event_cursor_store(profile_name):
79-
return FileEventCursorStore(profile_name)
79+
class AuditLogCursorStore(BaseCursorStore):
80+
def __init__(self, profile_name):
81+
dir_path = get_user_project_path("audit_log_checkpoints", profile_name)
82+
super().__init__(dir_path)
8083

84+
def get_events(self, cursor_name):
85+
try:
86+
location = path.join(self._dir_path, cursor_name) + "_events"
87+
with open(location) as checkpoint:
88+
try:
89+
return json.loads(checkpoint.read())
90+
except json.JSONDecodeError:
91+
return []
92+
except FileNotFoundError:
93+
return []
8194

82-
def get_alert_cursor_store(profile_name):
83-
return AlertCursorStore(profile_name)
95+
def replace_events(self, cursor_name, new_events):
96+
location = path.join(self._dir_path, cursor_name) + "_events"
97+
with open(location, "w") as checkpoint:
98+
checkpoint.write(json.dumps(new_events))
8499

85100

86101
def get_all_cursor_stores_for_profile(profile_name):
87-
return [FileEventCursorStore(profile_name), AlertCursorStore(profile_name)]
102+
return [
103+
FileEventCursorStore(profile_name),
104+
AlertCursorStore(profile_name),
105+
AuditLogCursorStore(profile_name),
106+
]

src/code42cli/util.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import json
12
import os
23
import shutil
34
from collections import OrderedDict
45
from functools import wraps
6+
from hashlib import md5
57
from os import path
68
from signal import getsignal
79
from signal import SIGINT
@@ -167,3 +169,9 @@ def _get_default_header(header_items):
167169
if key not in header and isinstance(key, str):
168170
header[key] = key
169171
return header
172+
173+
174+
def hash_event(event):
175+
if isinstance(event, dict):
176+
event = json.dumps(event, sort_keys=True)
177+
return md5(event.encode()).hexdigest()

0 commit comments

Comments
 (0)