-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathweb_framework.py
More file actions
286 lines (234 loc) · 8.84 KB
/
web_framework.py
File metadata and controls
286 lines (234 loc) · 8.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""Common utilities for web framework integration"""
import datetime
import os
import os.path
import re
import sys
import textwrap
import time
from abc import ABC, abstractmethod
from contextvars import ContextVar
from hashlib import sha256
from json.decoder import JSONDecodeError
from typing import Any, Optional, Protocol, Tuple
from _appmap import recording
from . import remote_recording
from .env import Env
from .event import (
Event,
ExceptionEvent,
HttpServerResponseEvent,
ReturnEvent,
describe_value,
)
from .recorder import Recorder, ThreadRecorder
from .utils import root_relative_path, scenario_filename
logger = Env.current.getLogger(__name__)
request_recorder: ContextVar[Optional[Recorder]] = ContextVar("appmap_request_recorder")
# These are the errors that can get raised when trying to update params based on the results of
# parsing the body of an application/json request:
JSON_ERRORS = (JSONDecodeError, AttributeError, TypeError, ValueError)
REQUEST_ENABLED_ATTR = "_appmap_request_enabled"
REMOTE_ENABLED_ATTR = "_appmap_remote_enabled"
class TemplateEvent(Event): # pylint: disable=too-few-public-methods
"""A special call event that records template rendering."""
__slots__ = ["receiver", "path"]
def __init__(self, path, instance=None):
super().__init__("call")
self.receiver = describe_value(None, instance)
self.path = root_relative_path(path)
def to_dict(self, attrs=None):
result = super().to_dict(attrs)
classlike_name = re.sub(r"\W", "", self.path.title())
result.update(
{
"defined_class": f"<templates>.{classlike_name}",
"method_id": "render",
"static": False,
}
)
return result
class HasFilename(Protocol): # pylint: disable=too-few-public-methods
filename: str
class TemplateHandler(HasFilename): # pylint: disable=too-few-public-methods
"""Patch for a template class to capture and record template
rendering (if recording is enabled).
This patch can be used with .utils.patch_class to patch any template class
which has a .render() method. Note it requires a .filename property; if
there is no such property, this handler can be subclassed first to provide it.
"""
def render(self, orig, *args, **kwargs):
"""Calls the original implementation.
If recording is enabled, adds appropriate TemplateEvent
and ReturnEvent.
"""
rec = Recorder.get_current()
if rec.get_enabled():
start = time.monotonic()
call_event = TemplateEvent(self.filename, self)
Recorder.add_event(call_event)
try:
return orig(self, *args, **kwargs)
finally:
if rec.get_enabled():
Recorder.add_event(ReturnEvent(call_event.id, time.monotonic() - start))
NAME_MAX = 255 # true for most filesystems
HASH_LEN = 7 # arbitrary, but git proves it's a reasonable value
APPMAP_SUFFIX = ".appmap.json"
def name_hash(namepart):
"""Returns the hex digits of the sha256 of the os.fsencode()d namepart."""
return sha256(os.fsencode(namepart)).hexdigest()
# pylint: disable=too-many-arguments,too-many-positional-arguments
def create_appmap_file(
output_dir,
request_method,
request_path_info,
request_full_path,
status,
headers,
rec,
):
start_time = datetime.datetime.now()
appmap_name = (
request_method
+ " "
+ request_path_info
+ " ("
+ str(status)
+ ") - "
+ start_time.strftime("%T.%f")[:-3]
)
appmap_basename = scenario_filename("_".join([str(start_time.timestamp()), request_full_path]))
appmap_file_path = os.path.join(output_dir, appmap_basename)
recorder_type = "requests"
metadata = {
"name": appmap_name,
"timestamp": start_time.timestamp(),
"recorder": {"name": "record_requests", "type": recorder_type},
}
recording.write_appmap(rec, appmap_basename, recorder_type, metadata)
headers["AppMap-Name"] = os.path.abspath(appmap_name)
headers["AppMap-File-Name"] = os.path.abspath(appmap_file_path) + APPMAP_SUFFIX
class AppmapMiddleware(ABC):
@abstractmethod
def before_request_main(self, rec, req: Any) -> Tuple[float, int]:
"""Specify the main operations to be performed by a request is processed."""
raise NotImplementedError
# pylint: disable=too-many-arguments,too-many-positional-arguments
def after_request_main(
self, request_path, status, headers, start, call_event_id
) -> Optional[HttpServerResponseEvent]:
if request_path == self.record_url:
return None
env = Env.current
if env.enables("requests") or env.enables("remote"):
rec = request_recorder.get() if env.enables("requests") else Recorder.get_global()
assert rec is not None
duration = time.monotonic() - start
return_event = HttpServerResponseEvent(
parent_id=call_event_id,
elapsed=duration,
status_code=status,
headers=headers,
)
rec.add_event(return_event)
return return_event
return None
def __init__(self, framework_name):
self.record_url = "/_appmap/record"
env = Env.current
record_requests = env.enables("requests")
if record_requests:
logger.info("Requests will be recorded (%s)", framework_name)
self.should_record = env.enables("remote") or record_requests
def before_request_hook(self, request) -> Tuple[Optional[Recorder], float, int]:
rec = None
start = 0
call_event_id = 0
env = Env.current
if env.enables("requests"):
rec = ThreadRecorder()
Recorder.set_current(rec)
rec.start_recording()
request_recorder.set(rec)
elif env.enables("remote"):
rec = Recorder.get_global()
if rec and rec.get_enabled():
start, call_event_id = self.before_request_main(rec, request)
return rec, start, call_event_id
# pylint: disable=too-many-arguments,too-many-positional-arguments
def after_request_hook(
self,
request_path,
request_method,
request_base_url,
status,
headers,
return_event,
) -> None:
if request_path == self.record_url:
return
env = Env.current
if env.enables("requests"):
rec = request_recorder.get()
assert rec is not None
try:
return_event.update(status, headers)
output_dir = Env.current.output_dir / "requests"
create_appmap_file(
output_dir,
request_method,
request_path,
request_base_url,
status,
headers,
rec,
)
finally:
rec.stop_recording()
Recorder.set_current(None)
request_recorder.set(None)
elif env.enables("remote"):
rec = Recorder.get_global()
assert rec is not None
if rec.get_enabled():
return_event.update(status, headers)
def on_exception(self, rec, start, call_event_id, exc_info):
duration = time.monotonic() - start
exception_event = ExceptionEvent(
parent_id=call_event_id,
elapsed=duration,
exc_info=exc_info,
)
rec.add_event(exception_event)
class MiddlewareInserter(ABC):
def __init__(self, debug):
self.debug = debug
@abstractmethod
def middleware_present(self):
"""Return True if the AppMap middleware is present, False otherwise."""
@abstractmethod
def insert_middleware(self):
"""Insert the AppMap middleware. Optionally return a new instance of the app."""
@abstractmethod
def remote_enabled(self):
"""Return True if the AppMap middleware has enabled remote recording, False otherwise."""
def run(self):
if not self.middleware_present():
return self.insert_middleware()
if self.remote_enabled() and not self.debug:
self._show_warning()
return None
def _show_warning(self):
# The user has explicitly asked for remote recording to be enabled in production. Let them
# know this probably isn't a good idea.
print("\n\n*** SECURITY RISK ***", file=sys.stderr)
msg = "Enabling remote recording in production can expose secret information."
print(
textwrap.fill(msg),
file=sys.stderr,
)
print("*** SECURITY RISK ***\n\n", file=sys.stderr)
logger.warning(msg)
def initialize():
remote_recording.initialize()