-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
88 lines (70 loc) · 2.68 KB
/
main.py
File metadata and controls
88 lines (70 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Main module for the log-event function handler."""
import os
import time
import uuid
from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import CustomStorage
FUNC = Function.instance()
def _app_headers() -> dict:
"""Build app headers for CustomStorage construction."""
app_id = os.environ.get("APP_ID")
if app_id:
return {"X-CS-APP-ID": app_id}
return {}
@FUNC.handler(method="POST", path="/log-event")
def on_post(request: Request) -> Response:
"""
Handle POST requests to /log-event endpoint.
Args:
request: The incoming request object containing the request body.
Returns:
Response: JSON response with event storage result or error message.
"""
# Validate request
if "event_data" not in request.body:
return Response(
code=400,
errors=[APIError(code=400, message="missing event_data")]
)
event_data = request.body["event_data"]
try:
# Store data in a collection
# This assumes you've already created a collection named "event_logs"
event_id = str(uuid.uuid4())
json_data = {
"event_id": event_id,
"data": event_data,
"timestamp": int(time.time())
}
custom_storage = CustomStorage(ext_headers=_app_headers())
collection_name = "event_logs"
response = custom_storage.PutObject(body=json_data,
collection_name=collection_name,
object_key=event_id)
if response["status_code"] != 200:
error_message = response.get("error", {}).get("message", "Unknown error")
return Response(
code=response["status_code"],
errors=[APIError(
code=response["status_code"],
message=f"Failed to store event: {error_message}"
)]
)
# Query the collection to retrieve the event by id
query_response = custom_storage.SearchObjects(filter=f"event_id:'{event_id}'",
collection_name=collection_name,
limit=5)
return Response(
body={
"stored": True,
"metadata": query_response.get("body", {}).get("resources", [])
},
code=200
)
except (ConnectionError, ValueError, KeyError) as e:
return Response(
code=500,
errors=[APIError(code=500, message=f"Error saving collection: {str(e)}")]
)
if __name__ == "__main__":
FUNC.run()