Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
UsageInfo,
)
from fastdeploy.entrypoints.openai.response_processors import ChatResponseProcessor
from fastdeploy.entrypoints.openai.utils import MasterCheckMixin
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.trace.constants import LoggingEventName
from fastdeploy.trace.trace_logger import print as trace_print
Expand All @@ -66,7 +67,7 @@
NONES = itertools.repeat(None)


class OpenAIServingChat:
class OpenAIServingChat(MasterCheckMixin):
"""
OpenAI-style chat completions serving
"""
Expand Down Expand Up @@ -100,9 +101,6 @@ def __init__(
self.is_master_ip = True
api_server_logger.info(f"master ip: {self.master_ip}")

def _check_master(self):
return self.engine_client.is_master or self.is_master_ip

async def create_chat_completion(self, request: ChatCompletionRequest):
"""
Create a new chat completion using the specified parameters.
Expand Down
6 changes: 2 additions & 4 deletions fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
PromptTokenUsageInfo,
UsageInfo,
)
from fastdeploy.entrypoints.openai.utils import MasterCheckMixin
from fastdeploy.trace.constants import LoggingEventName
from fastdeploy.trace.trace_logger import print as trace_print
from fastdeploy.utils import (
Expand All @@ -61,7 +62,7 @@
NONES = itertools.repeat(None)


class OpenAIServingCompletion:
class OpenAIServingCompletion(MasterCheckMixin):
def __init__(self, engine_client, models, pid, ips, max_waiting_time):
self.engine_client = engine_client
self.models = models
Expand All @@ -79,9 +80,6 @@ def __init__(self, engine_client, models, pid, ips, max_waiting_time):
self._is_process_response_dict_async = None
api_server_logger.info(f"master ip: {self.master_ip}")

def _check_master(self):
return self.engine_client.is_master or self.is_master_ip

async def create_completion(self, request: CompletionRequest):
"""
Create a completion for the given prompt.
Expand Down
24 changes: 24 additions & 0 deletions fastdeploy/entrypoints/openai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@
from fastdeploy.metrics.stats import ZMQMetricsStats
from fastdeploy.utils import FlexibleArgumentParser, api_server_logger


class MasterCheckMixin:
"""
Mixin class providing master node checking functionality for serving classes.

This mixin provides a common implementation of the _check_master method
that verifies if the current node is the master node. It requires the
inheriting class to have `engine_client` and `is_master_ip` attributes.

Attributes required in the inheriting class:
engine_client: Engine client with is_master attribute
is_master_ip: Boolean indicating if current IP is the master IP
"""

def _check_master(self) -> bool:
"""
Check if current node is the master node.

Returns:
bool: True if current node is master, False otherwise
"""
return self.engine_client.is_master or self.is_master_ip


UVICORN_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
Expand Down
23 changes: 2 additions & 21 deletions fastdeploy/input/ernie4_5_vl_processor/ernie4_5_vl_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from fastdeploy.engine.request import Request
from fastdeploy.input.ernie4_5_processor import Ernie4_5Processor
from fastdeploy.input.utils import IDS_TYPE_FLAG, process_stop_token_ids
from fastdeploy.input.utils import IDS_TYPE_FLAG, check_mm_limits, process_stop_token_ids
from fastdeploy.utils import data_processor_logger

from .process import DataProcessor
Expand Down Expand Up @@ -180,26 +180,7 @@ def _parse_limits(self, limits):
return DEFAULT_LIMITS

def _check_mm_limits(self, item):
if isinstance(item, dict):
# 请求包含prompt和multi_modal_data
mm_data = item
else:
# 请求包含messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
if part.get("type") == "image":
mm_data["image"].append(part)
elif part.get("type") == "video":
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in self.limit_mm_per_prompt:
limit = self.limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, " f"got {len(data)} but limit is {limit}")
check_mm_limits(item, self.limit_mm_per_prompt, type_check_mode="eq")

def process_request_dict(self, request, max_model_len=None):
"""process the input data"""
Expand Down
23 changes: 2 additions & 21 deletions fastdeploy/input/paddleocr_vl_processor/paddleocr_vl_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

from fastdeploy.engine.request import Request
from fastdeploy.input.text_processor import DataProcessor as TextProcessor
from fastdeploy.input.utils import check_mm_limits, process_stop_token_ids
from fastdeploy.utils import data_processor_logger

from .process import DataProcessor

_SAMPLING_EPS = 1e-5
from fastdeploy.input.utils import process_stop_token_ids


class PaddleOCRVLProcessor(TextProcessor):
Expand Down Expand Up @@ -171,26 +171,7 @@ def _check_mm_limits(self, item):
Raises:
ValueError: If input exceeds configured limits
"""
if isinstance(item, dict):
# 请求包含prompt和multi_modal_data
mm_data = item
else:
# 请求包含messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
if part.get("type") in ["image_url", "image"]:
mm_data["image"].append(part)
elif part.get("type") in ["video_url", "video"]:
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in self.limit_mm_per_prompt:
limit = self.limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, " f"got {len(data)} but limit is {limit}")
check_mm_limits(item, self.limit_mm_per_prompt, type_check_mode="in")

def process_request_dict(self, request, max_model_len=None):
"""
Expand Down
22 changes: 2 additions & 20 deletions fastdeploy/input/qwen3_vl_processor/qwen3_vl_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from fastdeploy.engine.request import Request
from fastdeploy.input.text_processor import DataProcessor as TextProcessor
from fastdeploy.input.utils import check_mm_limits
from fastdeploy.utils import data_processor_logger

from .process import DataProcessor
Expand Down Expand Up @@ -169,26 +170,7 @@ def _check_mm_limits(self, item):
Raises:
ValueError: If input exceeds configured limits
"""
if isinstance(item, dict):
# 请求包含prompt和multi_modal_data
mm_data = item
else:
# 请求包含messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
if part.get("type") in ["image_url", "image"]:
mm_data["image"].append(part)
elif part.get("type") in ["video_url", "video"]:
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in self.limit_mm_per_prompt:
limit = self.limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, " f"got {len(data)} but limit is {limit}")
check_mm_limits(item, self.limit_mm_per_prompt, type_check_mode="in")

def process_request_dict(self, request, max_model_len=None):
"""
Expand Down
23 changes: 2 additions & 21 deletions fastdeploy/input/qwen_vl_processor/qwen_vl_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from fastdeploy.engine.request import Request
from fastdeploy.input.text_processor import DataProcessor as TextProcessor
from fastdeploy.input.utils import process_stop_token_ids
from fastdeploy.input.utils import check_mm_limits, process_stop_token_ids
from fastdeploy.utils import data_processor_logger

from .process import DataProcessor
Expand Down Expand Up @@ -170,26 +170,7 @@ def _check_mm_limits(self, item):
Raises:
ValueError: If input exceeds configured limits
"""
if isinstance(item, dict):
# 请求包含prompt和multi_modal_data
mm_data = item
else:
# 请求包含messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
if part.get("type") in ["image_url", "image"]:
mm_data["image"].append(part)
elif part.get("type") in ["video_url", "video"]:
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in self.limit_mm_per_prompt:
limit = self.limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, " f"got {len(data)} but limit is {limit}")
check_mm_limits(item, self.limit_mm_per_prompt, type_check_mode="in")

def process_request_dict(self, request, max_model_len=None):
"""
Expand Down
55 changes: 54 additions & 1 deletion fastdeploy/input/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,65 @@

__all__ = [
"IDS_TYPE_FLAG",
"check_mm_limits",
]

IDS_TYPE_FLAG = {"text": 0, "image": 1, "video": 2, "audio": 3}


from typing import Any, Callable, Dict, List, Tuple
from typing import Any, Callable, Dict, List, Tuple, Union


def check_mm_limits(
item: Union[Dict[str, Any], List[Dict[str, Any]]],
limit_mm_per_prompt: Dict[str, int],
type_check_mode: str = "in",
) -> None:
"""
Validate multimodal inputs against configured limits.

Args:
item: Input request item to validate. Can be either:
- dict: Contains prompt and multi_modal_data
- list: Contains messages with multimodal content
limit_mm_per_prompt: Dictionary mapping modality names to their limits
type_check_mode: How to check content types:
- "in": Check if type is in ["image_url", "image"] (default)
- "eq": Check if type equals "image" exactly

Raises:
ValueError: If input exceeds configured limits or if type_check_mode is invalid
"""
if type_check_mode not in ("in", "eq"):
raise ValueError(f"Invalid type_check_mode: {type_check_mode}. Must be 'in' or 'eq'")

if isinstance(item, dict):
# Request contains prompt and multi_modal_data
mm_data = item
else:
# Request contains messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
part_type = part.get("type")
if type_check_mode == "in":
if part_type in ["image_url", "image"]:
mm_data["image"].append(part)
elif part_type in ["video_url", "video"]:
mm_data["video"].append(part)
else: # type_check_mode == "eq"
if part_type == "image":
mm_data["image"].append(part)
elif part_type == "video":
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in limit_mm_per_prompt:
limit = limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, got {len(data)} but limit is {limit}")


def process_stop_token_ids(
Expand Down
23 changes: 2 additions & 21 deletions fastdeploy/input/v1/ernie4_5_vl_processor/ernie4_5_vl_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from paddleformers.generation import GenerationConfig

from fastdeploy.engine.request import Request
from fastdeploy.input.utils import IDS_TYPE_FLAG, process_stop_token_ids
from fastdeploy.input.utils import IDS_TYPE_FLAG, check_mm_limits, process_stop_token_ids
from fastdeploy.input.v1.ernie4_5_processor import Ernie4_5Processor
from fastdeploy.utils import data_processor_logger

Expand Down Expand Up @@ -166,26 +166,7 @@ def _parse_limits(self, limits):
return DEFAULT_LIMITS

def _check_mm_limits(self, item):
if isinstance(item, dict):
# 请求包含prompt和multi_modal_data
mm_data = item
else:
# 请求包含messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
if part.get("type") == "image":
mm_data["image"].append(part)
elif part.get("type") == "video":
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in self.limit_mm_per_prompt:
limit = self.limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, " f"got {len(data)} but limit is {limit}")
check_mm_limits(item, self.limit_mm_per_prompt, type_check_mode="eq")

def process_request(self, request, max_model_len=None, **kwargs):
"""process the input data"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .process import DataProcessor

_SAMPLING_EPS = 1e-5
from fastdeploy.input.utils import process_stop_token_ids
from fastdeploy.input.utils import check_mm_limits, process_stop_token_ids


class PaddleOCRVLProcessor(TextProcessor):
Expand Down Expand Up @@ -171,26 +171,7 @@ def _check_mm_limits(self, item):
Raises:
ValueError: If input exceeds configured limits
"""
if isinstance(item, dict):
# 请求包含prompt和multi_modal_data
mm_data = item
else:
# 请求包含messages
mm_data = {"image": [], "video": []}

for message in item:
if isinstance(message.get("content"), list):
for part in message["content"]:
if part.get("type") in ["image_url", "image"]:
mm_data["image"].append(part)
elif part.get("type") in ["video_url", "video"]:
mm_data["video"].append(part)

for modality, data in mm_data.items():
if modality in self.limit_mm_per_prompt:
limit = self.limit_mm_per_prompt[modality]
if len(data) > limit:
raise ValueError(f"Too many {modality} items in prompt, " f"got {len(data)} but limit is {limit}")
check_mm_limits(item, self.limit_mm_per_prompt, type_check_mode="in")

def process_request_dict(self, request, max_model_len=None, **kwargs):
"""
Expand Down
Loading