|
11 | 11 | import urllib.error |
12 | 12 | import urllib.request |
13 | 13 | import uuid |
14 | | -from typing import Any, Callable, Dict, Optional |
| 14 | +from typing import Any, Callable, Dict, Literal, Optional, TypedDict, Union |
15 | 15 | from cryptography.exceptions import InvalidSignature |
16 | 16 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey |
17 | 17 |
|
18 | 18 |
|
19 | 19 | DEFAULT_API_BASE_URL = "https://auth.authforge.cc" |
20 | 20 | RATE_LIMIT_RETRY_DELAYS = (2, 5) |
21 | 21 | NETWORK_RETRY_DELAY = 2 |
| 22 | +class ValidateLicenseSuccess(TypedDict): |
| 23 | + valid: Literal[True] |
| 24 | + session_token: str |
| 25 | + expires_in: int |
| 26 | + session_data: Dict[str, Any] |
| 27 | + app_variables: Optional[Dict[str, Any]] |
| 28 | + license_variables: Optional[Dict[str, Any]] |
| 29 | + key_id: Optional[str] |
| 30 | + |
| 31 | + |
| 32 | +class ValidateLicenseFailure(TypedDict): |
| 33 | + valid: Literal[False] |
| 34 | + code: str |
| 35 | + error: str |
| 36 | + |
| 37 | + |
| 38 | +ValidateLicenseResult = Union[ValidateLicenseSuccess, ValidateLicenseFailure] |
| 39 | + |
| 40 | + |
22 | 41 | KNOWN_SERVER_ERRORS = { |
23 | 42 | "invalid_app", |
24 | 43 | "invalid_key", |
@@ -109,6 +128,36 @@ def login(self, license_key: str) -> bool: |
109 | 128 | self._fail("login_failed", exc) |
110 | 129 | return False |
111 | 130 |
|
| 131 | + def validate_license(self, license_key: str) -> ValidateLicenseResult: |
| 132 | + """Validate like :meth:`login` (same /auth/validate + signatures) without storing |
| 133 | + session state or starting the heartbeat thread.""" |
| 134 | + if not license_key or not isinstance(license_key, str): |
| 135 | + raise ValueError("license_key must be a non-empty string") |
| 136 | + try: |
| 137 | + body: Dict[str, Any] = { |
| 138 | + "appId": self.app_id, |
| 139 | + "appSecret": self.app_secret, |
| 140 | + "licenseKey": license_key, |
| 141 | + "hwid": self._hwid, |
| 142 | + "nonce": self._generate_nonce(), |
| 143 | + } |
| 144 | + if self.ttl_seconds is not None: |
| 145 | + body["ttlSeconds"] = self.ttl_seconds |
| 146 | + response_obj = self._post_json("/auth/validate", body, skip_failure_hook=True) |
| 147 | + expected_nonce = str(body.get("nonce", "")).strip() |
| 148 | + parsed = self._parse_validate_success(response_obj, expected_nonce) |
| 149 | + return { |
| 150 | + "valid": True, |
| 151 | + "session_token": parsed["session_token"], |
| 152 | + "expires_in": parsed["expires_in"], |
| 153 | + "session_data": parsed["session_data"], |
| 154 | + "app_variables": parsed["app_variables"], |
| 155 | + "license_variables": parsed["license_variables"], |
| 156 | + "key_id": parsed["key_id"], |
| 157 | + } |
| 158 | + except Exception as exc: |
| 159 | + return {"valid": False, "code": str(exc), "error": str(exc)} |
| 160 | + |
112 | 161 | def self_ban( |
113 | 162 | self, |
114 | 163 | *, |
@@ -250,13 +299,9 @@ def _validate_and_store(self, license_key: str) -> None: |
250 | 299 | context="validate", |
251 | 300 | ) |
252 | 301 |
|
253 | | - def _apply_signed_response( |
254 | | - self, |
255 | | - response_obj: Dict[str, Any], |
256 | | - expected_nonce: str, |
257 | | - license_key: Optional[str], |
258 | | - context: str, |
259 | | - ) -> None: |
| 302 | + def _parse_validate_success( |
| 303 | + self, response_obj: Dict[str, Any], expected_nonce: str |
| 304 | + ) -> Dict[str, Any]: |
260 | 305 | status = response_obj.get("status") |
261 | 306 | if not self._is_success_status(status): |
262 | 307 | error_code = self._extract_server_error(response_obj) |
@@ -288,21 +333,46 @@ def _apply_signed_response( |
288 | 333 | if expires_in is None: |
289 | 334 | raise ValueError("missing_expiresIn") |
290 | 335 |
|
| 336 | + return { |
| 337 | + "session_token": session_token, |
| 338 | + "expires_in": int(expires_in), |
| 339 | + "session_data": dict(payload_json), |
| 340 | + "app_variables": self._extract_optional_map(payload_json.get("appVariables")), |
| 341 | + "license_variables": self._extract_optional_map( |
| 342 | + payload_json.get("licenseVariables") |
| 343 | + ), |
| 344 | + "key_id": key_id if isinstance(key_id, str) else None, |
| 345 | + "raw_payload_b64": raw_payload_b64, |
| 346 | + "signature": signature, |
| 347 | + } |
| 348 | + |
| 349 | + def _apply_signed_response( |
| 350 | + self, |
| 351 | + response_obj: Dict[str, Any], |
| 352 | + expected_nonce: str, |
| 353 | + license_key: Optional[str], |
| 354 | + context: str, |
| 355 | + ) -> None: |
| 356 | + parsed = self._parse_validate_success(response_obj, expected_nonce) |
| 357 | + _ = context |
| 358 | + |
291 | 359 | with self._lock: |
292 | 360 | if license_key is not None: |
293 | 361 | self._license_key = license_key |
294 | | - self._session_token = session_token |
295 | | - self._session_expires_in = int(expires_in) |
| 362 | + self._session_token = parsed["session_token"] |
| 363 | + self._session_expires_in = int(parsed["expires_in"]) |
296 | 364 | self._last_nonce = expected_nonce |
297 | | - self._raw_payload_b64 = raw_payload_b64 |
298 | | - self._signature = signature |
299 | | - self._key_id = key_id |
300 | | - self._session_data = dict(payload_json) |
301 | | - self._app_variables = self._extract_optional_map(payload_json.get("appVariables")) |
302 | | - self._license_variables = self._extract_optional_map(payload_json.get("licenseVariables")) |
| 365 | + self._raw_payload_b64 = parsed["raw_payload_b64"] |
| 366 | + self._signature = parsed["signature"] |
| 367 | + self._key_id = parsed["key_id"] |
| 368 | + self._session_data = dict(parsed["session_data"]) |
| 369 | + self._app_variables = parsed["app_variables"] |
| 370 | + self._license_variables = parsed["license_variables"] |
303 | 371 | self._authenticated = True |
304 | 372 |
|
305 | | - def _post_json(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]: |
| 373 | + def _post_json( |
| 374 | + self, path: str, data: Dict[str, Any], *, skip_failure_hook: bool = False |
| 375 | + ) -> Dict[str, Any]: |
306 | 376 | url = f"{self.api_base_url}{path}" |
307 | 377 | body = dict(data) |
308 | 378 | rate_attempt = 0 |
@@ -342,7 +412,8 @@ def _post_json(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]: |
342 | 412 | network_attempt += 1 |
343 | 413 | time.sleep(NETWORK_RETRY_DELAY) |
344 | 414 | continue |
345 | | - self._fail("network_error", exc) |
| 415 | + if not skip_failure_hook: |
| 416 | + self._fail("network_error", exc) |
346 | 417 | raise RuntimeError(f"url_error: {exc}") from exc |
347 | 418 |
|
348 | 419 | is_rate_limited = ( |
|
0 commit comments