-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapi_cache.py
More file actions
494 lines (432 loc) · 22 KB
/
api_cache.py
File metadata and controls
494 lines (432 loc) · 22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# api_cache.py
import os
import re
import ipaddress
import hashlib
import asyncio
import uuid
import aiohttp
import mimetypes
import urllib.parse
from urllib.parse import urlparse
from aiohttp import web
# 视频下载锁字典(按 URL hash 粒度加锁)
_video_download_locks = {}
_video_locks_lock = asyncio.Lock()
_video_lock_refs = {} # {url_hash: int} 引用计数器,替代 lock._waiters
def _scan_dir_stats(dir_path):
"""使用 os.scandir() 统计目录下的直接文件数量和总大小"""
count = 0
total_size = 0
if not os.path.exists(dir_path):
return count, total_size
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_file(follow_symlinks=False):
count += 1
try:
total_size += entry.stat(follow_symlinks=False).st_size
except (OSError, FileNotFoundError):
pass
except (OSError, FileNotFoundError):
pass
return count, total_size
def _clean_nested_url(url, endpoint):
"""清除被污染的嵌套 URL,如 /community_hub/image?url=http://... 反复嵌套的情况"""
prefix = f'/community_hub/{endpoint}?url='
while url.startswith(prefix):
url = urllib.parse.unquote(url.replace(prefix, ''))
return url
async def _stream_file_chunks(resp, file_path, start=0, end=None):
"""流式读取文件并写入响应,支持 Range 请求(指定 start/end)和完整文件传输(end=None)"""
with open(file_path, 'rb') as f:
if start > 0:
f.seek(start)
remaining = (end - start + 1) if end is not None else None
chunk_size = 256 * 1024
while True:
if remaining is not None:
if remaining <= 0:
break
to_read = min(chunk_size, remaining)
else:
to_read = chunk_size
data = f.read(to_read)
if not data:
break
try:
await resp.write(data)
except (ConnectionResetError, RuntimeError, BrokenPipeError):
break
if remaining is not None:
remaining -= len(data)
async def _get_video_lock(url_hash):
async with _video_locks_lock:
if url_hash not in _video_download_locks:
_video_download_locks[url_hash] = asyncio.Lock()
_video_lock_refs[url_hash] = 0
_video_lock_refs[url_hash] += 1
return _video_download_locks[url_hash]
def _is_local_request(request):
"""检查请求是否来自本机或内网(10.x.x.x),用于保护管理接口"""
remote = request.remote or ""
if remote in ("127.0.0.1", "localhost", "::1"):
return True
if remote.startswith("10."):
return True
return False
def _is_forbidden_target(url):
"""检查 URL 是否指向内网/本地地址,防止 SSRF"""
try:
parsed = urlparse(url)
host = parsed.hostname
if not host:
return True
# 尝试直接解析为 IP
try:
ip = ipaddress.ip_address(host)
return ip.is_private or ip.is_loopback or ip.is_link_local
except ValueError:
# 不是IP地址(是域名),检查常见危险域名
dangerous_hosts = ('localhost', 'metadata.google.internal')
return host.lower() in dangerous_hosts
except Exception:
return True
def _cleanup_empty_cache(local_path, url_hash, ext):
"""清理0字节的损坏缓存文件"""
if os.path.exists(local_path) and os.path.getsize(local_path) == 0:
try:
os.remove(local_path)
print(f"[ComfyUI-Ranking] 🧹 已清理空缓存文件: {url_hash}.{ext}")
except (OSError, FileNotFoundError):
pass
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
CUSTOM_NODES_DIR = os.path.dirname(THIS_DIR)
COMFY_ROOT_DIR = os.path.dirname(CUSTOM_NODES_DIR)
# 确保存放在 ComfyUI/models/cache/images
CACHE_ROOT_DIR = os.path.join(COMFY_ROOT_DIR, "models", "cache")
IMAGE_CACHE_DIR = os.path.join(CACHE_ROOT_DIR, "images")
# 视频缓存目录(与图片分离)
VIDEO_CACHE_DIR = os.path.join(CACHE_ROOT_DIR, "videos")
os.makedirs(IMAGE_CACHE_DIR, exist_ok=True)
os.makedirs(VIDEO_CACHE_DIR, exist_ok=True)
# 视频缓存限制:100MB,平衡常见短视频需求与磁盘占用
MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100MB
VIDEO_TIMEOUT = aiohttp.ClientTimeout(total=300)
async def cache_image_handler(request):
"""本地 API:异步拦截图片请求,防阻塞实现硬盘级永久缓存
核心原则:本地缓存文件永远优先,网络下载是 fallback
"""
url = request.query.get("url")
if not url:
return web.Response(status=400, text="Missing url")
url = _clean_nested_url(url, 'image')
# 🚀 核心配合:拦截旧版因 Private 导致 401 的 HF 直链,强行重写为云端代理!
if url.startswith("https://huggingface.co/datasets/ZHIWEI666/ComfyUI-Ranking/resolve/main/"):
url = "https://zhiwei666-comfyui-ranking-api.hf.space/api/image_proxy?url=" + urllib.parse.quote(url)
if not url.startswith('http'):
raise web.HTTPFound(location=url)
if _is_forbidden_target(url):
return web.Response(status=403, text="Forbidden target address")
# 生成缓存路径
url_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
# 根据URL后缀确定扩展名,默认jpg
ext = url.split('.')[-1].split('?')[0]
if len(ext) > 4 or not ext.isalnum():
ext = "jpg"
local_path = os.path.join(IMAGE_CACHE_DIR, f"{url_hash}.{ext}")
# 🚀 优先级1:本地缓存存在且有效,直接返回(零延迟)
if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
content_type, _ = mimetypes.guess_type(local_path)
return web.FileResponse(local_path, headers={'Content-Type': content_type or 'image/jpeg'})
# 优先级2:本地无缓存或缓存无效,尝试从网络下载
try:
async with aiohttp.ClientSession() as session:
# 伪装 User-Agent 防止被拦截
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
# 加入 ssl=False 彻底解决 ComfyUI 整合包证书报错问题
# 超时时间30秒,与前端保持一致
async with session.get(url, headers=headers, ssl=False, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
content = await response.read()
# 确保缓存目录存在
os.makedirs(IMAGE_CACHE_DIR, exist_ok=True)
with open(local_path, "wb") as f:
f.write(content)
print(f"[ComfyUI-Ranking] ✅ 成功下载并缓存图片: {url_hash}.{ext}")
content_type, _ = mimetypes.guess_type(local_path)
return web.FileResponse(local_path, headers={'Content-Type': content_type or 'image/jpeg'})
else:
# 源站返回非200,转发原始状态码
print(f"[ComfyUI-Ranking] ⚠️ 图片下载失败 (状态码: {response.status}): {url[:80]}...")
return web.Response(status=response.status, text=f"Upstream returned {response.status}")
except asyncio.TimeoutError as e:
print(f"[ComfyUI-Ranking] ⚠️ 图片代理超时: {url[:80]}... 错误: {str(e)}")
_cleanup_empty_cache(local_path, url_hash, ext)
return web.Response(status=504, text="Image proxy timeout")
except aiohttp.ClientError as e:
print(f"[ComfyUI-Ranking] ⚠️ 图片代理连接错误: {url[:80]}... 错误: {str(e)}")
_cleanup_empty_cache(local_path, url_hash, ext)
return web.Response(status=502, text=f"Image proxy connection error: {str(e)}")
except Exception as e:
print(f"[ComfyUI-Ranking] ⚠️ 图片代理内部错误: {url[:80]}... 错误: {str(e)}")
_cleanup_empty_cache(local_path, url_hash, ext)
return web.Response(status=500, text=f"Image proxy internal error: {str(e)}")
async def _serve_video_file(request, file_path):
"""使用 StreamResponse 流式返回视频文件,支持 HTTP Range 请求(视频 seek 必需)"""
file_size = os.path.getsize(file_path)
content_type, _ = mimetypes.guess_type(file_path)
content_type = content_type or 'video/mp4'
range_header = request.headers.get('Range')
if range_header:
# 解析 Range: bytes=start-end
try:
range_str = range_header.replace('bytes=', '')
start_str, end_str = range_str.split('-')
if not start_str and end_str:
# suffix-byte-range-spec: bytes=-N (最后N个字节)
suffix_length = int(end_str)
start = max(0, file_size - suffix_length)
end = file_size - 1
elif start_str:
start = int(start_str)
end = int(end_str) if end_str else file_size - 1
else:
raise ValueError("Invalid Range format")
if start >= file_size or start < 0 or end >= file_size or end < start:
return web.Response(status=416, text="Range Not Satisfiable")
except (ValueError, IndexError):
start = 0
end = file_size - 1
resp = web.StreamResponse(status=206, headers={
'Content-Type': content_type,
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{file_size}',
'Content-Length': str(end - start + 1),
})
await resp.prepare(request)
try:
await _stream_file_chunks(resp, file_path, start, end)
finally:
try:
await resp.write_eof()
except Exception:
pass
return resp
else:
resp = web.StreamResponse(status=200, headers={
'Content-Type': content_type,
'Accept-Ranges': 'bytes',
'Content-Length': str(file_size),
})
await resp.prepare(request)
try:
await _stream_file_chunks(resp, file_path)
finally:
try:
await resp.write_eof()
except Exception:
pass
return resp
async def cache_video_handler(request):
"""视频代理缓存接口 /community_hub/video?url=...
关键设计:
- 缓存目录独立:ComfyUI/models/cache/videos/
- 支持 HTTP Range 请求(视频 seek/拖动进度条必需)
- 使用 StreamResponse 流式传输,不一次性读入内存
- 单文件最大 100MB,超过直接转发不缓存
- 下载超时 300 秒
"""
url = request.query.get("url")
if not url:
return web.Response(status=400, text="Missing url")
url = _clean_nested_url(url, 'video')
if not url.startswith('http'):
raise web.HTTPFound(location=url)
if _is_forbidden_target(url):
return web.Response(status=403, text="Forbidden target address")
# 生成缓存路径
url_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
# 根据 URL 后缀确定扩展名,限制为常见视频格式
ext = url.split('.')[-1].split('?')[0].lower()
valid_exts = {'mp4', 'webm', 'mov', 'avi', 'mkv'}
if ext not in valid_exts:
ext = 'mp4'
local_path = os.path.join(VIDEO_CACHE_DIR, f"{url_hash}.{ext}")
# 🚀 优先级1:本地缓存存在且有效,直接返回(零延迟)
if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
return await _serve_video_file(request, local_path)
# 优先级2:本地无缓存或缓存无效,尝试从网络下载
lock = await _get_video_lock(url_hash)
try:
async with lock:
# 双重检查:锁获取后再次确认缓存是否已存在(其他协程可能已下载完成)
if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
return await _serve_video_file(request, local_path)
try:
async with aiohttp.ClientSession() as session:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
async with session.get(url, headers=headers, ssl=False, timeout=VIDEO_TIMEOUT) as response:
if response.status != 200:
print(f"[ComfyUI-Ranking] ⚠️ 视频下载失败 (状态码: {response.status}): {url[:80]}...")
return web.Response(status=response.status, text=f"Upstream returned {response.status}")
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > MAX_VIDEO_SIZE:
# 超过大小限制,直接流式转发不缓存
content_type = response.headers.get('Content-Type') or mimetypes.guess_type(local_path)[0] or 'video/mp4'
stream_resp = web.StreamResponse(status=200, headers={
'Content-Type': content_type,
'Accept-Ranges': 'bytes',
})
stream_resp.headers['Content-Length'] = content_length
await stream_resp.prepare(request)
try:
async for chunk in response.content.iter_chunked(256 * 1024):
try:
await stream_resp.write(chunk)
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, RuntimeError):
break
finally:
try:
await stream_resp.write_eof()
except Exception:
pass
return stream_resp
# 🚀 Tee 流式缓存:边下载边转发给客户端,同时写入本地缓存
os.makedirs(VIDEO_CACHE_DIR, exist_ok=True)
content_type = response.headers.get('Content-Type', 'video/mp4')
headers = {
'Content-Type': content_type,
'Accept-Ranges': 'bytes',
'Cache-Control': 'public, max-age=86400',
}
if content_length:
headers['Content-Length'] = content_length
resp = web.StreamResponse(status=200, headers=headers)
await resp.prepare(request)
tmp_path = local_path + f'.tmp.{uuid.uuid4().hex[:8]}'
downloaded_size = 0
expected_size = int(content_length) if content_length else None
client_alive = True
source_download_complete = False
try:
with open(tmp_path, 'wb') as f:
async for chunk in response.content.iter_chunked(256 * 1024):
f.write(chunk)
downloaded_size += len(chunk)
if downloaded_size > MAX_VIDEO_SIZE:
source_download_complete = False
print(f"[ComfyUI-Ranking] ⚠️ 视频超过最大缓存限制 ({MAX_VIDEO_SIZE} bytes),中断下载: {url_hash}")
break
if client_alive:
try:
await resp.write(chunk)
await asyncio.sleep(0)
except (ConnectionResetError, RuntimeError, BrokenPipeError):
client_alive = False
print(f"[ComfyUI-Ranking] ℹ️ 客户端断连,继续后台缓存: {url_hash}")
# 循环正常结束 = 源站下载完成
source_download_complete = True
except Exception as e:
# 源站下载中断(非客户端断连导致)
print(f"[ComfyUI-Ranking] ⚠️ 源站下载中断: {str(e)}")
source_download_complete = False
finally:
try:
await resp.write_eof()
except Exception:
pass
# 根据 source_download_complete 决定是否保存缓存
if source_download_complete:
if expected_size and downloaded_size == expected_size:
os.replace(tmp_path, local_path)
print(f"[ComfyUI-Ranking] ✅ 视频已缓存: {url_hash} ({downloaded_size} bytes)")
elif not expected_size and downloaded_size > 0:
os.replace(tmp_path, local_path)
print(f"[ComfyUI-Ranking] ✅ 视频已缓存(无Content-Length): {url_hash} ({downloaded_size} bytes)")
else:
if os.path.exists(tmp_path):
os.remove(tmp_path)
else:
if os.path.exists(tmp_path):
os.remove(tmp_path)
return resp
except asyncio.TimeoutError as e:
print(f"[ComfyUI-Ranking] ⚠️ 视频代理超时: {url[:80]}... 错误: {str(e)}")
_cleanup_empty_cache(local_path, url_hash, ext)
return web.Response(status=504, text="Video proxy timeout")
except aiohttp.ClientError as e:
print(f"[ComfyUI-Ranking] ⚠️ 视频代理连接错误: {url[:80]}... 错误: {str(e)}")
_cleanup_empty_cache(local_path, url_hash, ext)
return web.Response(status=502, text=f"Video proxy connection error: {str(e)}")
except Exception as e:
print(f"[ComfyUI-Ranking] ⚠️ 视频代理内部错误: {url[:80]}... 错误: {str(e)}")
_cleanup_empty_cache(local_path, url_hash, ext)
return web.Response(status=500, text=f"Video proxy internal error: {str(e)}")
finally:
# 🔒 锁释放后清理:引用计数归0时删除锁对象防止内存泄漏
async with _video_locks_lock:
if url_hash in _video_lock_refs:
_video_lock_refs[url_hash] -= 1
if _video_lock_refs[url_hash] <= 0:
_video_download_locks.pop(url_hash, None)
_video_lock_refs.pop(url_hash, None)
async def cache_stats_handler(request):
"""GET /community_hub/cache/stats - 返回图片和视频缓存统计"""
if not _is_local_request(request):
return web.Response(status=403, text="Forbidden: local access only")
image_count, image_size = _scan_dir_stats(IMAGE_CACHE_DIR)
video_count, video_size = _scan_dir_stats(VIDEO_CACHE_DIR)
return web.json_response({
"image_count": image_count,
"image_size": image_size,
"video_count": video_count,
"video_size": video_size,
})
async def cache_clear_handler(request):
"""POST /community_hub/cache/clear - 清理缓存文件"""
if not _is_local_request(request):
return web.Response(status=403, text="Forbidden: local access only")
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="Invalid JSON body")
target = body.get("target")
if target not in ("all", "images", "videos"):
return web.Response(status=400, text="Invalid target. Must be 'all', 'images', or 'videos'")
dirs_to_clear = []
if target == "all":
dirs_to_clear = [IMAGE_CACHE_DIR, VIDEO_CACHE_DIR]
elif target == "images":
dirs_to_clear = [IMAGE_CACHE_DIR]
elif target == "videos":
dirs_to_clear = [VIDEO_CACHE_DIR]
cleared_count = 0
freed_size = 0
for dir_path in dirs_to_clear:
if not os.path.exists(dir_path):
continue
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_file(follow_symlinks=False):
name = entry.name
# 跳过正在下载的临时文件
if re.search(r'\.tmp\.[a-f0-9]{8}$', name):
continue
try:
file_size = entry.stat(follow_symlinks=False).st_size
os.remove(entry.path)
cleared_count += 1
freed_size += file_size
except FileNotFoundError:
pass
except OSError:
pass
except (OSError, FileNotFoundError):
pass
return web.json_response({
"cleared_count": cleared_count,
"freed_size": freed_size,
})