Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions lmdeploy/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def record_specdecode(self, stats: SpeculativeDecodingStats):
def record_finish(self, stats: RequestStats):
pass

def log_spec_msg(self):
def get_spec_msg(self):
"""Get spec decoding logging msg."""
if self.num_drafts == 0:
return
return None

draft_acceptance_rate = (self.num_accepted_tokens / self.num_draft_tokens *
100 if self.num_draft_tokens > 0 else float('nan'))
Expand All @@ -97,7 +97,6 @@ def log_spec_msg(self):

def log(self):
now = time.perf_counter()
spec_msg = self.log_spec_msg()

# skip logging if no tokens were processed
if self.total_prompt_tokens == 0 and self.total_generation_tokens == 0:
Expand All @@ -108,23 +107,27 @@ def log(self):
prompt_throughput = self.total_prompt_tokens / (now - self.last_log_time)
generation_throughput = self.total_generation_tokens / (now - self.last_log_time)
scheduler_stats = self.last_scheduler_stats
self._reset(now)
scheduler_stats.num_api_waiting_reqs = scheduler_stats.num_total_reqs - \
scheduler_stats.num_completed_reqs - scheduler_stats.num_api_routed_reqs
spec_msg = self.get_spec_msg()

# format and print
log_msg = (f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} "
f'DP{self.dp_rank}] '
f'Avg prompt throughput: {prompt_throughput:.1f} tokens/s, '
f'Avg generation throughput: {generation_throughput:.1f} tokens/s, '
f'Finished: {scheduler_stats.num_finished_reqs} reqs, '
f'Unfinished: {scheduler_stats.num_total_reqs-scheduler_stats.num_finished_reqs} reqs, '
f'Running: {scheduler_stats.num_running_reqs} reqs, '
f'Waiting: {scheduler_stats.num_waiting_reqs} reqs, '
f'GPU KV cache usage: {scheduler_stats.gpu_cache_usage * 100 :.1f}%, '
f'Prefix cache hit rate: {scheduler_stats.prefix_cache_hit_rate * 100 :.1f}%')
log_msg = (
f"[{datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')} DP{self.dp_rank}] "
f'Avg thr (in/out): {prompt_throughput:.1f} / {generation_throughput:.1f} tokens/s, '
f'API server (completed/routed/waiting): {scheduler_stats.num_completed_reqs} / '
f'{scheduler_stats.num_api_routed_reqs} / {scheduler_stats.num_api_waiting_reqs}, '
f'Engine (running/waiting): {scheduler_stats.num_running_reqs} / {scheduler_stats.num_waiting_reqs}, '
f'KV cache: {scheduler_stats.gpu_cache_usage * 100 :.1f}%, ')

if scheduler_stats.prefix_cache_hit_rate != 0:
log_msg += f'Prefix cache hit rate: {scheduler_stats.prefix_cache_hit_rate * 100 :.1f}%, '

if spec_msg is not None:
log_msg += ', ' + spec_msg
log_msg += spec_msg

print(log_msg, flush=True)
self._reset(now)


class PrometheusStatLogger(StatLoggerBase):
Expand Down Expand Up @@ -154,13 +157,18 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0):
#
# Scheduler stats
#
self.gauge_scheduler_finished = prometheus_client.Gauge(name='lmdeploy:num_requests_finished',
documentation='Number of current finished requests.',
labelnames=labelnames).labels(*labelvalues)
self.gauge_scheduler_completed = prometheus_client.Gauge(name='lmdeploy:num_requests_completed',
documentation='Number of current completed requests.',
labelnames=labelnames).labels(*labelvalues)

self.gauge_scheduler_api_routed = prometheus_client.Gauge(
name='lmdeploy:num_api_requests_routed',
documentation='Number of requests routed to request handles.',
labelnames=labelnames).labels(*labelvalues)

self.gauge_scheduler_unfinished = prometheus_client.Gauge(
name='lmdeploy:num_requests_unfinished',
documentation='Number of current unfinished requests.',
self.gauge_scheduler_api_waiting = prometheus_client.Gauge(
name='lmdeploy:num_api_requests_waiting',
documentation='Number of requests waiting for free request handles.',
labelnames=labelnames).labels(*labelvalues)

self.gauge_scheduler_running = prometheus_client.Gauge(
Expand Down Expand Up @@ -300,8 +308,10 @@ def __init__(self, model_name: str, max_model_len: int, dp_rank: int = 0):

def record_schedule(self, stats: SchedulerStats) -> None:
"""Report schedule metrics to prometheus."""
self.gauge_scheduler_finished.set(stats.num_finished_reqs)
self.gauge_scheduler_unfinished.set(stats.num_total_reqs - stats.num_finished_reqs)
self.gauge_scheduler_completed.set(stats.num_completed_reqs)
self.gauge_scheduler_api_routed.set(stats.num_api_routed_reqs)
self.gauge_scheduler_api_waiting.set(stats.num_total_reqs - stats.num_completed_reqs -
stats.num_api_routed_reqs)
self.gauge_scheduler_running.set(stats.num_running_reqs)
self.gauge_scheduler_waiting.set(stats.num_waiting_reqs)
self.gauge_gpu_cache_usage.set(stats.gpu_cache_usage)
Expand Down
18 changes: 13 additions & 5 deletions lmdeploy/metrics/metrics_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,21 @@ def queue_update(self, update_data: tuple):
return
self.metrics_queue.put_nowait(update_data)

def increment_total_requests(self):
"""Increment total requests."""
def increase_total_requests(self):
"""Increase total requests."""
self.scheduler_stats.num_total_reqs += 1

def increment_finished_requests(self):
"""Increment finished requests."""
self.scheduler_stats.num_finished_reqs += 1
def increase_completed_requests(self):
"""Increase completed requests."""
self.scheduler_stats.num_completed_reqs += 1

def increase_api_routed_requests(self):
"""Increase API routed requests."""
self.scheduler_stats.num_api_routed_reqs += 1

def decrease_api_routed_requests(self):
"""Decrease API routed requests."""
self.scheduler_stats.num_api_routed_reqs -= 1


metrics_processor = MetricsProcessor()
25 changes: 19 additions & 6 deletions lmdeploy/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,29 @@
@dataclass
class SchedulerStats:
"""Stats associated with the scheduler.
Desc:
Dataflow: client --> API server --> Engine core
API server total = completed + uncompleted = completed + (api_routed + api_waiting)
Engine core total = running + waiting = api_routed

Attributes:
num_total_reqs: The number of all requests received since server start.
num_finished_reqs: The number of successfully completed requests since server start.
num_running_reqs: Currently executing requests.
num_waiting_reqs: Requests queued waiting for execution.
num_total_reqs: API server, the number of all requests received since server start.
num_completed_reqs: API server, the number of successfully completed requests since server start.
num_api_routed_reqs: API server, the number of requests routed to request handles.
num_api_waiting_reqs: API server, the number of requests waiting for free request handles.
num_running_reqs: Engine core, currently executing requests.
num_waiting_reqs: Engine core, requests queued waiting for execution.
gpu_cache_usage: Fraction of GPU KV blocks utilized (0.0 to 1.0).
prefix_cache_hit_rate: Prefix caching hit rate.
"""

# api server
num_total_reqs: int = 0
num_finished_reqs: int = 0
num_completed_reqs: int = 0
num_api_routed_reqs: int = 0
num_api_waiting_reqs: int = 0

# engine core
num_running_reqs: int = 0
num_waiting_reqs: int = 0
gpu_cache_usage: float = 0.0
Expand All @@ -33,7 +44,9 @@ class SchedulerStats:
def __repr__(self):
return ('SchedulerStats(\n'
f' num_total_reqs={self.num_total_reqs},\n'
f' num_finished_reqs={self.num_finished_reqs},\n'
f' num_completed_reqs={self.num_completed_reqs},\n'
f' num_api_routed_reqs={self.num_api_routed_reqs},\n'
f' num_api_waiting_reqs={self.num_api_waiting_reqs},\n'
f' num_running_reqs={self.num_running_reqs},\n'
f' num_waiting_reqs={self.num_waiting_reqs},\n'
f' gpu_cache_usage={self.gpu_cache_usage:.6f},\n'
Expand Down
11 changes: 6 additions & 5 deletions lmdeploy/serve/core/async_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def _determine_gen_config(self, session, input_ids, gen_config: GenerationConfig
async def safe_run(self, handle, session, **kwargs):
generator = handle.async_stream_infer(session.session_id, **kwargs)
try:
metrics_processor.increase_api_routed_requests()
yield generator
except (Exception, asyncio.CancelledError, GeneratorExit) as e: # noqa
logger.error(f'[safe_run] session {session.session_id} exception caught: {type(e).__name__} {e}')
Expand All @@ -274,6 +275,7 @@ async def safe_run(self, handle, session, **kwargs):
raise SafeRunException(f'Safe run exception for session {session.session_id}') from e
finally:
await generator.aclose()
metrics_processor.decrease_api_routed_requests()

async def generate(
self,
Expand Down Expand Up @@ -389,13 +391,12 @@ def is_error(status):
if not gen_config.ignore_eos:
stop_ids = gen_config.stop_token_ids or []

metrics_processor.increment_total_requests()

metrics_processor.increase_total_requests()
async with session.request_handle() as handle:
if epoch != self.epoch:
logger.debug(f'[generate] session {session_id} got aborted before starting inference')
# TODO(lvhan): metrics_processor.increment_failed_requests('abort')
metrics_processor.increment_finished_requests()
# TODO(lvhan): metrics_processor.increase_failed_requests('abort')
metrics_processor.increase_completed_requests()
yield GenOut(response='',
history_token_len=0,
input_token_len=len(input_ids),
Expand Down Expand Up @@ -467,7 +468,7 @@ def is_error(status):
out.logits = (outputs.logits[:-hit_stop_token] if hit_stop_token else outputs.logits)
yield out
# end of generator loop
metrics_processor.increment_finished_requests()
metrics_processor.increase_completed_requests()

if not is_error(outputs.status):
if outputs.status == ResponseType.CANCEL:
Expand Down