|
1 | 1 | import re |
2 | 2 | import base64 |
| 3 | +import logging |
3 | 4 | from operator import itemgetter |
| 5 | +from typing import List, Tuple, Optional |
| 6 | + |
| 7 | +logger = logging.getLogger(__name__) |
4 | 8 |
|
5 | 9 | BK_SEP = '//_//' |
6 | 10 | BK_BLOCKS = [ |
|
11 | 15 | 'JCQjISFAIyFAIyM=' |
12 | 16 | ] |
13 | 17 |
|
| 18 | +# Компилируем один раз |
| 19 | +QUALITY_PATTERN = re.compile(r'\[((\d+)[^]]+)].+?(http.+?mp4)', re.DOTALL) |
| 20 | + |
| 21 | + |
| 22 | +def parse_streams(data: str) -> List[Tuple[str, int, str]]: |
| 23 | + """ |
| 24 | + Парсит потоки из обфусцированной или plain text строки. |
| 25 | + Сохраняет оригинальную логику: берет первый URL после качества. |
| 26 | +
|
| 27 | + Returns: |
| 28 | + [(quality_name, quality_int, url), ...] отсортировано по качеству desc |
| 29 | +
|
| 30 | + Raises: |
| 31 | + ValueError: Если нет валидных потоков |
| 32 | + """ |
| 33 | + if not isinstance(data, str) or not data.strip(): |
| 34 | + raise ValueError("Input must be non-empty string") |
| 35 | + |
| 36 | + # Шаг 1: Очистка (как в оригинале) |
| 37 | + cleaned = _clean_obfuscation(data) |
| 38 | + |
| 39 | + # Шаг 2: Декодирование с автоопределением формата |
| 40 | + decoded = _decode(cleaned) |
| 41 | + if not decoded: |
| 42 | + raise ValueError("Failed to decode streams data") |
| 43 | + |
| 44 | + # Шаг 3: Парсинг (оригинальная логика) |
| 45 | + streams = _parse_quality_blocks(decoded) |
| 46 | + |
| 47 | + if not streams: |
| 48 | + logger.warning(f"No streams parsed from: {decoded[:200]}...") |
| 49 | + raise ValueError("No streams found in decoded data") |
| 50 | + |
| 51 | + # Шаг 4: Сортировка (оригинальная) |
| 52 | + return sorted(streams, key=itemgetter(1), reverse=True) |
| 53 | + |
| 54 | + |
| 55 | +def _clean_obfuscation(data: str) -> str: |
| 56 | + """Удаляет escape-символы и мусорные блоки.""" |
| 57 | + result = data.replace('\\', '') |
| 58 | + for block in BK_BLOCKS: |
| 59 | + result = result.replace(BK_SEP + block, '') |
| 60 | + return result |
| 61 | + |
| 62 | + |
| 63 | +def _decode(data: str) -> Optional[str]: |
| 64 | + """ |
| 65 | + Определяет формат и декодирует. |
| 66 | + Plain text: начинается с [ |
| 67 | + Base64: остальное |
| 68 | + """ |
| 69 | + # Plain text (новый формат без обфускации) |
| 70 | + if data.startswith('['): |
| 71 | + logger.debug("Detected plain text format") |
| 72 | + return data |
| 73 | + |
| 74 | + # Base64 (старый формат) |
| 75 | + try: |
| 76 | + # Проверяем минимальную длину для base64 |
| 77 | + if len(data) < 4: |
| 78 | + return None |
| 79 | + |
| 80 | + decoded_bytes = base64.b64decode(data[2:]) |
| 81 | + return decoded_bytes.decode('utf-8') |
| 82 | + except (ValueError, UnicodeDecodeError) as e: |
| 83 | + logger.debug(f"Base64 decode failed: {e}") |
| 84 | + return None |
| 85 | + |
| 86 | + |
| 87 | +def _parse_quality_blocks(decoded: str) -> List[Tuple[str, int, str]]: |
| 88 | + """ |
| 89 | + Парсит блоки [quality]url. |
| 90 | + Сохраняет оригинальное поведение: берет первый http...mp4 после [quality]. |
| 91 | + """ |
| 92 | + streams = [] |
| 93 | + |
| 94 | + for match in QUALITY_PATTERN.finditer(decoded): |
| 95 | + quality_name = match.group(1) # "1080p" или "1080p Ultra" |
| 96 | + quality_num = int(match.group(2)) # 1080 |
| 97 | + url = match.group(3) # http...mp4 (первый найденный после [quality]) |
| 98 | + |
| 99 | + # Валидация URL |
| 100 | + if _is_valid_url(url): |
| 101 | + # Очистка HLS-суффикса для совместимости |
| 102 | + clean_url = url.replace(':hls:manifest.m3u8', '') |
| 103 | + streams.append((quality_name, quality_num, clean_url)) |
| 104 | + else: |
| 105 | + logger.warning(f"Invalid URL parsed: {url[:100]}...") |
| 106 | + |
| 107 | + return streams |
| 108 | + |
14 | 109 |
|
15 | | -def parse_streams(salted): |
16 | | - salted = salted.replace('\\', '') |
17 | | - for bk in BK_BLOCKS: |
18 | | - salted = salted.replace(BK_SEP + bk, '') |
19 | | - decoded_streams = base64.b64decode(salted[2:]).decode('utf-8') |
20 | | - parsed_streams = [] |
21 | | - for name, quality, url in re.findall(r'\[((\d+)[^]]+)].+?(http.+?mp4)', decoded_streams): |
22 | | - parsed_streams.append(( |
23 | | - name, int(quality), url |
24 | | - )) |
25 | | - return sorted(parsed_streams, key=itemgetter(1), reverse=True) |
| 110 | +def _is_valid_url(url: str) -> bool: |
| 111 | + """Проверяет что URL начинается с http и содержит домен.""" |
| 112 | + return ( |
| 113 | + url.startswith(('http://', 'https://')) and |
| 114 | + len(url) > 10 and |
| 115 | + '.' in url[8:] # после http:// или https:// |
| 116 | + ) |
0 commit comments