-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
355 lines (288 loc) · 12.5 KB
/
api.py
File metadata and controls
355 lines (288 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
import datetime
import hashlib
import json
import logging
import uuid
import re
from argparse import ArgumentParser
from email.message import Message
from enum import Enum
from http.server import (
BaseHTTPRequestHandler,
HTTPServer,
)
from typing import Any, Callable, Optional, Union
from scoring import get_interests, get_score
class Gender(Enum):
UNKNOWN = 0
MALE = 1
FEMALE = 2
class ErrorMessage(Enum):
BAD_REQUEST = "Bad Request"
FORBIDDEN = "Forbidden"
NOT_FOUND = "Not Found"
INVALID_REQUEST = "Invalid Request"
INTERNAL_ERROR = "Internal Server Error"
SALT = "Otus"
ADMIN_LOGIN = "admin"
ADMIN_SALT = "42"
OK = 200
BAD_REQUEST = 400
FORBIDDEN = 403
NOT_FOUND = 404
INVALID_REQUEST = 422
INTERNAL_ERROR = 500
ERRORS = {
BAD_REQUEST: "Bad Request",
FORBIDDEN: "Forbidden",
NOT_FOUND: "Not Found",
INVALID_REQUEST: "Invalid Request",
INTERNAL_ERROR: "Internal Server Error",
}
METHODS = ["clients_interests", "online_score"]
class Field:
def __init__(self, required: bool = False, nullable: bool = True) -> None:
self.required = required
self.nullable = nullable
self.errors: list[dict[str, str]] = []
def __set_name__(self, owner: type, name: str) -> None:
self._name = name
def __get__(self, instance: Any, owner: type) -> Any:
return instance.__dict__[self._name]
def __set__(self, instance: Any, value: Any) -> None:
if not self.nullable and value is None:
self.errors.append({self._name: "must be not null"})
if self.required and not self._name:
self.errors.append({self._name: "must be not null"})
self.validate(value, field_name=self._name)
instance.__dict__[self._name] = value
def validate(self, value: Any, field_name: str) -> Any:
return value
class BaseValidate(type):
def __new__(
mcs: type,
name: str,
bases: list,
namespace: dict[str, Any],
) -> type:
cls: type = super().__new__(mcs, name, bases, namespace) # type: ignore[misc]
fields: list[tuple[str, Field]] = []
for attr_name, attr_value in namespace.items():
if isinstance(attr_value, Field):
fields.append((attr_name, attr_value))
if fields:
has_classwide_validate = "validate" in namespace and callable(namespace["validate"])
def _init(self: Any, **kwargs: Any) -> None:
self.errors = {}
for field_name, field_value in fields:
field_value.errors = []
if field_name in kwargs:
setattr(self, field_name, kwargs[field_name])
if field_value.errors:
for err in field_value.errors:
self.errors.update(err)
elif getattr(field_value, "required", False):
self.errors[field_name] = "is required"
else:
self.__dict__[field_name] = None
if has_classwide_validate:
self.validate()
cls.__init__ = _init # type: ignore[misc]
return cls # type: ignore[return-value]
class CharField(Field):
def __init__(
self,
min_lenght: Optional[int] = None,
max_lenght: Optional[int] = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.min_lenght = min_lenght
self.max_lenght = max_lenght
def validate(self, value: Any, field_name: str) -> Any:
if not isinstance(value, str):
self.errors.append({field_name: f"{value} is not char"})
if self.min_lenght is not None and len(value) < self.min_lenght:
self.errors.append({field_name: f"{value} must have min {self.min_lenght} chars"})
if self.max_lenght is not None and len(value) > self.max_lenght:
self.errors.append({field_name: f"{value} must have max {self.max_lenght} chars"})
return value
class EmailField(Field):
def validate(self, value: Any, field_name: str) -> Any:
if value is not None and "@" in value:
return value
else:
self.errors.append({field_name: f"{value} must have @"})
return None
class PhoneField(Field):
def validate(self, value: Any, field_name: str) -> Any:
if not isinstance(value, (str, int)):
self.errors.append({field_name: f"{value} is not char or int"})
if not len(str(value)) == 11:
self.errors.append({field_name: f"{value} must have 11 chars"})
if not str(value).startswith("7"):
self.errors.append({field_name: f"{value} must starts with 7"})
return value
class BirthDayField(Field):
def validate(self, value: Any, field_name: str) -> Any:
pattern = r"\d{2}.\d{2}.\d{4}$"
if not re.match(pattern, value):
self.errors.append({field_name: f"{value} not match with DD.MM.YYYY format"})
return value
target_date = datetime.datetime.strptime(value, "%d.%m.%Y")
floor_date = datetime.datetime.now() - datetime.timedelta(70 * 365)
if target_date < floor_date:
self.errors.append({field_name: f"{value} date must be less 70 year from now"})
return value
class GenderField(Field):
def validate(self, value: Any, field_name: str) -> Any:
if value not in [0, 1, 2]:
self.errors.append({field_name: f"{value} must be 0, 1 or 2"})
return value
class ClientIDsField(Field):
def validate(self, value: Any, field_name: str) -> Any:
if not isinstance(value, list):
self.errors.append({field_name: f"{value} must be list"})
if not value:
self.errors.append({field_name: f"{value} must be not empty"})
if not all(isinstance(val, int) for val in value):
self.errors.append({field_name: f"{value} must be all integers"})
return value
class DateField(Field):
def validate(self, value: Any, field_name: str) -> Any:
pattern = r"\d{2}.\d{2}.\d{4}$"
if not re.match(pattern, value):
self.errors.append({field_name: f"{value} not match with DD.MM.YYYY format"})
return value
class ArgumentsField(Field):
def validate(self, value: Any, field_name: str) -> Any:
if not isinstance(value, dict):
self.errors.append({field_name: f"{value} must be dict"})
return value
class ClientsInterestsRequest(metaclass=BaseValidate):
client_ids = ClientIDsField(required=True)
date = DateField(required=False, nullable=True)
class OnlineScoreRequest(metaclass=BaseValidate):
first_name = CharField(required=False, nullable=True)
last_name = CharField(required=False, nullable=True)
email = EmailField(required=False, nullable=True)
phone = PhoneField(required=False, nullable=True)
birthday = BirthDayField(required=False, nullable=True)
gender = GenderField(required=False, nullable=True)
def validate(self) -> None:
has_phone_email = bool(self.phone and self.email)
has_name = bool(self.first_name and self.last_name)
has_gender_birthday = bool(self.gender is not None and self.birthday)
if not (has_phone_email or has_name or has_gender_birthday):
self.errors.update({ # type: ignore[attr-defined]
"At least one pair must be provided": "phone+email, first_name+last_name or gender+birthday"
})
class MethodRequest(metaclass=BaseValidate):
account = CharField(required=False, nullable=True)
login = CharField(required=True, nullable=True)
token = CharField(required=True, nullable=True)
arguments = ArgumentsField(required=True, nullable=True)
method = CharField(required=True, nullable=False)
@property
def is_admin(self) -> bool:
return str(self.login) == ADMIN_LOGIN
def check_auth(request: MethodRequest) -> bool:
if request.is_admin:
digest = hashlib.sha512(
(datetime.datetime.now().strftime("%Y%m%d%H") + ADMIN_SALT).encode("utf-8")
).hexdigest()
else:
account = request.account if request.account is not None else ""
login = request.login if request.login is not None else ""
digest = hashlib.sha512(
(account + login + SALT).encode("utf-8")
).hexdigest()
return digest == (request.token if request.token is not None else "")
def method_handler(
request: dict[str, Any],
ctx: dict[str, Any],
) -> tuple[Any, int]:
metods = {"clients_interests": clients_interests, "online_score": online_score}
req = MethodRequest(**request["body"])
if req.errors: # type: ignore[attr-defined]
return req.errors, INVALID_REQUEST # type: ignore[attr-defined]
elif not check_auth(req):
return "Forbidden", FORBIDDEN
elif req.method not in METHODS:
return "Unavalible method provided", INVALID_REQUEST
return metods[req.method](req, ctx)
def online_score(
request: MethodRequest, ctx: dict[str, Any]
) -> tuple[dict[str, Any], int]:
if str(request.login) == ADMIN_LOGIN:
return {"score": int(ADMIN_SALT)}, OK
args = OnlineScoreRequest(**request.arguments)
if args.errors: # type: ignore[attr-defined]
return args.errors, INVALID_REQUEST # type: ignore[attr-defined]
ctx["has"] = [arg for arg, val in request.arguments.items() if val is not None]
return {"score": get_score(**request.arguments)}, OK
def clients_interests(
request: MethodRequest, ctx: dict[str, Any]
) -> tuple[Union[dict[str, str], dict[int, list[str]]], int]:
args = ClientsInterestsRequest(**request.arguments)
if args.errors: # type: ignore[attr-defined]
return args.errors, INVALID_REQUEST # type: ignore[attr-defined]
ctx["nclients"] = len(args.client_ids)
resp = {client_id: get_interests(client_id) for client_id in args.client_ids}
return resp, OK
class MainHTTPHandler(BaseHTTPRequestHandler):
router: dict[str, Callable] = {"method": method_handler}
def get_request_id(self, headers: Message[str, str]) -> str:
return headers.get("HTTP_X_REQUEST_ID", uuid.uuid4().hex)
def do_POST(self) -> None:
response, code = {}, OK
context = {"request_id": self.get_request_id(self.headers)}
request = None
try:
data_string = self.rfile.read(int(self.headers["Content-Length"]))
request = json.loads(data_string)
except Exception:
code = BAD_REQUEST
if request:
path = self.path.strip("/")
logging.info("%s: %s %s" % (self.path, data_string.decode("utf-8"), context["request_id"]))
if path in self.router:
try:
response, code = self.router[path](
{"body": request, "headers": self.headers},
context,
)
except Exception as e:
logging.exception("Unexpected error: %s" % e)
code = INTERNAL_ERROR
else:
code = NOT_FOUND
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.end_headers()
if code not in ERRORS:
r = {"response": response, "code": code}
else:
r = {"error": response or ERRORS.get(code, "Unknown Error"), "code": code}
ctx_response: dict[str, Any] = r
context.update(ctx_response)
logging.info(context)
self.wfile.write(json.dumps(r).encode("utf-8"))
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("-p", "--port", action="store", type=int, default=8080)
parser.add_argument("-l", "--log", action="store", default=None)
args = parser.parse_args()
logging.basicConfig(
filename=args.log,
level=logging.INFO,
format="[%(asctime)s] %(levelname).1s %(message)s",
datefmt="%Y.%m.%d %H:%M:%S",
)
server = HTTPServer(("localhost", args.port), MainHTTPHandler)
logging.info("Starting server at %s" % args.port)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()