Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions lmdeploy/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,17 @@ def log(self):
prompt_throughput = self.total_prompt_tokens / (now - self.last_log_time)
generation_throughput = self.total_generation_tokens / (now - self.last_log_time)
scheduler_stats = self.last_scheduler_stats
scheduler_stats.num_api_waiting_reqs = scheduler_stats.num_total_reqs - \
scheduler_stats.num_completed_reqs - scheduler_stats.num_api_routed_reqs
spec_msg = self.get_spec_msg()

# format and print
log_msg = (
f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} DP{self.dp_rank}] "
f'Avg thr (in/out): {prompt_throughput:.1f} / {generation_throughput:.1f} tokens/s, '
f'API server (completed/routed/waiting): {scheduler_stats.num_completed_reqs} / '
f'{scheduler_stats.num_api_routed_reqs} / {scheduler_stats.num_api_waiting_reqs}, '
f'Engine (running/waiting): {scheduler_stats.num_running_reqs} / {scheduler_stats.num_waiting_reqs}, '
f'KV cache: {scheduler_stats.gpu_cache_usage * 100 :.1f}%, ')
log_msg = (f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} DP{self.dp_rank}] "
f'Avg thr (in/out): {prompt_throughput:.1f} / {generation_throughput:.1f} tokens/s, '
f'Server (succeeded/failed/routed/waiting): '
f'{scheduler_stats.num_succeeded_reqs} / {scheduler_stats.num_failed_reqs} / '
f'{scheduler_stats.num_api_routed_reqs} / {scheduler_stats.num_api_waiting_reqs}, '
f'Engine (running/waiting): '
f'{scheduler_stats.num_running_reqs} / {scheduler_stats.num_waiting_reqs}, '
f'KV cache: {scheduler_stats.gpu_cache_usage * 100 :.1f}%, ')

if scheduler_stats.prefix_cache_hit_rate != 0:
log_msg += f'Prefix cache hit rate: {scheduler_stats.prefix_cache_hit_rate * 100 :.1f}%, '
Expand Down Expand Up @@ -156,10 +155,14 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0):
#
# Scheduler stats
#
self.gauge_scheduler_completed = prometheus_client.Gauge(name='lmdeploy:num_requests_completed',
documentation='Number of current completed requests.',
self.gauge_scheduler_succeeded = prometheus_client.Gauge(name='lmdeploy:num_requests_succeeded',
documentation='Number of current succeeded requests.',
labelnames=labelnames).labels(*labelvalues)

self.gauge_scheduler_failed = prometheus_client.Gauge(name='lmdeploy:num_requests_failed',
documentation='Number of current failed requests.',
labelnames=labelnames).labels(*labelvalues)

self.gauge_scheduler_api_routed = prometheus_client.Gauge(
name='lmdeploy:num_api_requests_routed',
documentation='Number of requests routed to request handles.',
Expand Down Expand Up @@ -307,10 +310,10 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0):

def record_schedule(self, stats: SchedulerStats) -> None:
"""Report schedule metrics to prometheus."""
self.gauge_scheduler_completed.set(stats.num_completed_reqs)
self.gauge_scheduler_succeeded.set(stats.num_succeeded_reqs)
self.gauge_scheduler_failed.set(stats.num_failed_reqs)
self.gauge_scheduler_api_routed.set(stats.num_api_routed_reqs)
self.gauge_scheduler_api_waiting.set(stats.num_total_reqs - stats.num_completed_reqs -
stats.num_api_routed_reqs)
self.gauge_scheduler_api_waiting.set(stats.num_api_waiting_reqs)
self.gauge_scheduler_running.set(stats.num_running_reqs)
self.gauge_scheduler_waiting.set(stats.num_waiting_reqs)
self.gauge_gpu_cache_usage.set(stats.gpu_cache_usage)
Expand Down
15 changes: 12 additions & 3 deletions lmdeploy/metrics/metrics_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,18 @@ def increase_total_requests(self):
"""Increase total requests."""
self.scheduler_stats.num_total_reqs += 1

def increase_completed_requests(self):
"""Increase completed requests."""
self.scheduler_stats.num_completed_reqs += 1
def increase_succeeded_requests(self):
"""Increase succeeded requests."""
self.scheduler_stats.num_succeeded_reqs += 1

def increase_failed_requests(self, failure_type: str = 'error'):
"""Increase failed requests."""
if failure_type == 'cancel':
self.scheduler_stats.num_cancelled_reqs += 1
elif failure_type == 'abort':
self.scheduler_stats.num_aborted_reqs += 1
elif failure_type == 'error':
self.scheduler_stats.num_errored_reqs += 1

def increase_api_routed_requests(self):
"""Increase API routed requests."""
Expand Down
53 changes: 44 additions & 9 deletions lmdeploy/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,27 @@ class SchedulerStats:
"""Stats associated with the scheduler.
Desc:
Dataflow: client --> API server --> Engine core
API server total = completed + uncompleted = completed + (api_routed + api_waiting)
Engine core total = running + waiting = api_routed

API server request states (axis view):
|<──────────────────────────────── total ────────────────────────────────>|
|<──────────── completed ─────────────>|<────── uncompleted ─────────────>|
|<─ success ─>|<──────── fail ────────>|<─ routed ─>|<───── waiting ─────>|
|<cancel>|<abort>|<error>|

Engine core request states (axis view):
|<────────────────── routed ──────────────────>|
|<───── running ──────>|<────── waiting ──────>|

Attributes:
num_total_reqs: API server, the number of all requests received since server start.
num_completed_reqs: API server, the number of successfully completed requests since server start.
num_api_routed_reqs: API server, the number of requests routed to request handles.
num_api_waiting_reqs: API server, the number of requests waiting for free request handles.
# API server
num_total_reqs: the number of all requests received since server start.
num_succeeded_reqs: the number of successfully completed requests since server start.
num_cancelled_reqs: the number of cancelled requests since server start.
num_aborted_reqs: the number of aborted requests since server start.
num_errored_reqs: the number of requests that end with errors since server start.
num_api_routed_reqs: the number of requests routed to request handles.

# Engine core
num_running_reqs: Engine core, currently executing requests.
num_waiting_reqs: Engine core, requests queued waiting for execution.
gpu_cache_usage: Fraction of GPU KV blocks utilized (0.0 to 1.0).
Expand All @@ -30,20 +43,42 @@ class SchedulerStats:

# api server
num_total_reqs: int = 0
num_completed_reqs: int = 0
num_succeeded_reqs: int = 0
num_cancelled_reqs: int = 0
num_aborted_reqs: int = 0
num_errored_reqs: int = 0
num_api_routed_reqs: int = 0
num_api_waiting_reqs: int = 0

# engine core
num_running_reqs: int = 0
num_waiting_reqs: int = 0
gpu_cache_usage: float = 0.0
prefix_cache_hit_rate: float = 0.0

@property
def num_failed_reqs(self) -> int:
return self.num_cancelled_reqs + self.num_aborted_reqs + self.num_errored_reqs

@property
def num_completed_reqs(self) -> int:
return self.num_succeeded_reqs + self.num_failed_reqs

@property
def num_uncompleted_reqs(self) -> int:
return self.num_total_reqs - self.num_completed_reqs

@property
def num_api_waiting_reqs(self) -> int:
"""The number of requests waiting for free request handles."""
return self.num_uncompleted_reqs - self.num_api_routed_reqs

def __repr__(self):
return ('SchedulerStats(\n'
f' num_total_reqs={self.num_total_reqs},\n'
f' num_completed_reqs={self.num_completed_reqs},\n'
f' num_succeeded_reqs={self.num_succeeded_reqs},\n'
f' num_cancelled_reqs={self.num_cancelled_reqs},\n'
f' num_aborted_reqs={self.num_aborted_reqs},\n'
f' num_errored_reqs={self.num_errored_reqs},\n'
f' num_api_routed_reqs={self.num_api_routed_reqs},\n'
f' num_api_waiting_reqs={self.num_api_waiting_reqs},\n'
f' num_running_reqs={self.num_running_reqs},\n'
Expand Down
8 changes: 5 additions & 3 deletions lmdeploy/serve/core/async_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ async def safe_run(self, handle, session, **kwargs):
yield generator
except (Exception, asyncio.CancelledError, GeneratorExit) as e: # noqa
logger.exception(f'[safe_run] session {session.session_id} exception caught: {e}')
metrics_processor.increase_failed_requests('cancel')
# Use asyncio.shield to protect cleanup coroutines from being cancelled.
# When a task is in cancelling state, bare `await` raises CancelledError
# immediately. shield ensures the inner coroutine runs to completion.
Expand Down Expand Up @@ -424,8 +425,7 @@ def is_error(status):
async with session.request_handle() as handle:
if epoch != self.epoch:
logger.info(f'[generate] session {session_id} got aborted before starting inference')
# TODO(lvhan): metrics_processor.increase_failed_requests('abort')
metrics_processor.increase_completed_requests()
metrics_processor.increase_failed_requests('abort')
yield GenOut(response='',
history_token_len=0,
input_token_len=len(input_ids),
Expand Down Expand Up @@ -501,13 +501,14 @@ def is_error(status):
out.logits = (outputs.logits[:-hit_stop_token] if hit_stop_token else outputs.logits)
yield out
# end of generator loop
metrics_processor.increase_completed_requests()

if not is_error(outputs.status):
if outputs.status == ResponseType.CANCEL:
finish_reason = 'abort'
metrics_processor.increase_failed_requests('abort')
else:
finish_reason = 'stop' if outputs.token_ids[-1] in stop_ids else 'length'
metrics_processor.increase_succeeded_requests()

# utf-8 char at the end means it's a potential unfinished byte sequence
if not response.endswith('�'):
Expand Down Expand Up @@ -547,6 +548,7 @@ def is_error(status):
else:
logger.error(f'session {session_id} finished, {outputs.status}, '
'reason "error"')
metrics_processor.increase_failed_requests('error')
yield GenOut(response=f'internal error happened, status code {outputs.status}',
history_token_len=session.step,
input_token_len=len(input_ids),
Expand Down
Loading