Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from ..voice import AgentSession, io
from ..voice.run_result import RunEvent
from ..voice.transcription import TranscriptSynchronizer
from ..worker import AgentServer, WorkerOptions
from ..worker import AgentServer, ServerEnvOption, WorkerOptions
from . import proto
from .log import JsonFormatter, _merge_record_extra, _silence_noisy_loggers

Expand Down Expand Up @@ -1493,13 +1493,14 @@ def _run_console(
output_device: str | None,
mode: ConsoleMode,
record: bool,
log_level: int | str = logging.DEBUG,
) -> None:
c = AgentsConsole.get_instance()
c.console_mode = mode
c.enabled = True
c.record = record

_configure_logger(c, logging.DEBUG)
_configure_logger(c, log_level)
c.print("Starting console mode 🚀", tag="Agents")

if c.record:
Expand Down Expand Up @@ -1662,6 +1663,9 @@ class LogLevel(str, enum.Enum):
def _build_cli(server: AgentServer) -> typer.Typer:
app = typer.Typer(rich_markup_mode="rich")

_start_log_default = LogLevel(ServerEnvOption.getvalue(server.log_level, False))
_dev_log_default = LogLevel(ServerEnvOption.getvalue(server.log_level, True))

@app.command()
def console(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add log_level for console mode as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*,
Expand All @@ -1688,6 +1692,12 @@ def console(
typer.Option(help="Whether to start the console in text mode"),
] = False,
record: Annotated[bool, typer.Option(help="Whether to record the AgentSession")] = False,
log_level: Annotated[
LogLevel,
typer.Option(
help="Set the log level", case_sensitive=False, envvar="LIVEKIT_LOG_LEVEL"
),
] = _dev_log_default,
) -> None:
"""
Run a [bold]LiveKit Agents[/bold] in [yellow]console[/yellow] mode.
Expand All @@ -1708,15 +1718,18 @@ def console(
output_device=output_device,
mode="text" if text else "audio",
record=record,
log_level=log_level.value,
)

@app.command()
def start(
*,
log_level: Annotated[
LogLevel,
typer.Option(help="Set the log level", case_sensitive=False),
] = LogLevel.info,
typer.Option(
help="Set the log level", case_sensitive=False, envvar="LIVEKIT_LOG_LEVEL"
),
] = _start_log_default,
url: Annotated[
str | None, # noqa: UP007
typer.Option(
Expand Down Expand Up @@ -1763,8 +1776,10 @@ def dev(
*,
log_level: Annotated[
LogLevel,
typer.Option(help="Set the log level", case_sensitive=False),
] = LogLevel.debug,
typer.Option(
help="Set the log level", case_sensitive=False, envvar="LIVEKIT_LOG_LEVEL"
),
] = _dev_log_default,
reload: Annotated[
bool,
typer.Option(help="Enable auto-reload of the server when (code) files change."),
Expand Down
42 changes: 42 additions & 0 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,33 @@ def getvalue(opt: T | ServerEnvOption[T], devmode: bool) -> T:


_default_load_threshold = ServerEnvOption(dev_default=math.inf, prod_default=0.7)
_default_log_level = ServerEnvOption(dev_default="DEBUG", prod_default="INFO")
_default_permissions = WorkerPermissions()

VALID_LOG_LEVELS = frozenset({"TRACE", "DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"})


def _validate_and_normalize_log_level(
log_level: str | ServerEnvOption[str],
) -> str | ServerEnvOption[str]:
if isinstance(log_level, ServerEnvOption):
levels_to_check = [log_level.dev_default, log_level.prod_default]
else:
levels_to_check = [log_level]

for level in levels_to_check:
if level.upper() not in VALID_LOG_LEVELS:
raise ValueError(
f"Invalid log level {level!r}. Valid levels: {', '.join(sorted(VALID_LOG_LEVELS))}"
)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

if isinstance(log_level, ServerEnvOption):
return ServerEnvOption(
dev_default=log_level.dev_default.upper(),
prod_default=log_level.prod_default.upper(),
)
return log_level.upper()


# NOTE: this object must be pickle-able
@dataclass
Expand Down Expand Up @@ -206,6 +231,13 @@ class ServerOptions:

By default it uses ``LIVEKIT_API_SECRET`` from environment"""

log_level: str | ServerEnvOption[str] = _default_log_level
"""Log level for the worker.

Defaults to ``DEBUG`` in development mode and ``INFO`` in production mode.
Can also be set via the ``LIVEKIT_LOG_LEVEL`` environment variable or
the ``--log-level`` CLI argument (CLI takes highest precedence)."""

host: str = "" # default to all interfaces
port: int | ServerEnvOption[int] = ServerEnvOption(dev_default=0, prod_default=8081)
"""Port for local HTTP server to listen on.
Expand Down Expand Up @@ -233,6 +265,9 @@ class ServerOptions:
When None (default), multiprocess mode is disabled and only main process metrics are collected.
Users can also set PROMETHEUS_MULTIPROC_DIR environment variable directly before starting the worker."""

def __post_init__(self) -> None:
self.log_level = _validate_and_normalize_log_level(self.log_level)

def validate_config(self, devmode: bool) -> None:
load_threshold = ServerEnvOption.getvalue(self.load_threshold, devmode)
if load_threshold > 1 and not devmode:
Expand Down Expand Up @@ -285,6 +320,7 @@ def __init__(
load_fnc: Callable[[AgentServer], float] | Callable[[], float] | None = None,
prometheus_port: int | None = None,
prometheus_multiproc_dir: str | None = None,
log_level: str | ServerEnvOption[str] = _default_log_level,
) -> None:
super().__init__()
self._ws_url = ws_url or os.environ.get("LIVEKIT_URL") or ""
Expand Down Expand Up @@ -314,6 +350,7 @@ def __init__(
http_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("HTTP_PROXY")

self._http_proxy = http_proxy
self._log_level = _validate_and_normalize_log_level(log_level)
self._agent_name = ""
self._server_type = ServerType.ROOM
self._id = "unregistered"
Expand All @@ -337,6 +374,10 @@ def __init__(

self._lock = asyncio.Lock()

@property
def log_level(self) -> str | ServerEnvOption[str]:
return self._log_level

@property
def setup_fnc(self) -> Callable[[JobProcess], Any] | None:
return self._setup_fnc
Expand Down Expand Up @@ -380,6 +421,7 @@ def from_server_options(cls, options: ServerOptions) -> AgentServer:
prometheus_port=options.prometheus_port if is_given(options.prometheus_port) else None,
setup_fnc=options.prewarm_fnc,
load_fnc=options.load_fnc,
log_level=options.log_level,
)
server.rtc_session(
options.entrypoint_fnc,
Expand Down
200 changes: 200 additions & 0 deletions tests/test_cli_log_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from __future__ import annotations

from unittest.mock import patch

import pytest
from typer.testing import CliRunner

from livekit.agents.cli.cli import _build_cli
from livekit.agents.worker import AgentServer, ServerEnvOption, ServerOptions


def _make_server(**kwargs) -> AgentServer:
async def _fake_entrypoint(ctx):
pass

opts = ServerOptions(entrypoint_fnc=_fake_entrypoint, **kwargs)
return AgentServer.from_server_options(opts)


@pytest.fixture
def runner():
return CliRunner()


class TestServerOptionsLogLevel:
def test_default_uses_server_env_option(self):
opts = ServerOptions(entrypoint_fnc=lambda ctx: None)
assert isinstance(opts.log_level, ServerEnvOption)
assert ServerEnvOption.getvalue(opts.log_level, False) == "INFO"
assert ServerEnvOption.getvalue(opts.log_level, True) == "DEBUG"

def test_custom_string_log_level(self):
opts = ServerOptions(entrypoint_fnc=lambda ctx: None, log_level="WARN")
assert opts.log_level == "WARN"
assert ServerEnvOption.getvalue(opts.log_level, False) == "WARN"
assert ServerEnvOption.getvalue(opts.log_level, True) == "WARN"

def test_custom_server_env_option(self):
opts = ServerOptions(
entrypoint_fnc=lambda ctx: None,
log_level=ServerEnvOption(dev_default="ERROR", prod_default="CRITICAL"),
)
assert ServerEnvOption.getvalue(opts.log_level, False) == "CRITICAL"
assert ServerEnvOption.getvalue(opts.log_level, True) == "ERROR"


class TestAgentServerLogLevel:
def test_default_log_level(self):
server = AgentServer()
assert isinstance(server.log_level, ServerEnvOption)
assert ServerEnvOption.getvalue(server.log_level, False) == "INFO"
assert ServerEnvOption.getvalue(server.log_level, True) == "DEBUG"

def test_custom_log_level(self):
server = AgentServer(log_level="WARN")
assert server.log_level == "WARN"

def test_from_server_options_passes_log_level(self):
server = _make_server(log_level="ERROR")
assert server.log_level == "ERROR"


class TestStartCommandLogLevel:
@patch("livekit.agents.cli.cli._run_worker")
def test_default_log_level_is_info(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(app, ["start"])
assert result.exit_code == 0
args = mock_run_worker.call_args
assert args.kwargs["args"].log_level == "INFO"

@patch("livekit.agents.cli.cli._run_worker")
def test_cli_arg_overrides_default(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(app, ["start", "--log-level", "error"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "ERROR"

@patch("livekit.agents.cli.cli._run_worker")
def test_env_var_overrides_default(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(app, ["start"], env={"LIVEKIT_LOG_LEVEL": "CRITICAL"})
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "CRITICAL"

@patch("livekit.agents.cli.cli._run_worker")
def test_cli_arg_overrides_env_var(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(
app,
["start", "--log-level", "warn"],
env={"LIVEKIT_LOG_LEVEL": "CRITICAL"},
)
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "WARN"

@patch("livekit.agents.cli.cli._run_worker")
def test_server_opts_log_level_used_when_no_cli_or_env(self, mock_run_worker, runner):
server = _make_server(log_level="ERROR")
app = _build_cli(server)
result = runner.invoke(app, ["start"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "ERROR"

@patch("livekit.agents.cli.cli._run_worker")
def test_cli_arg_overrides_server_opts(self, mock_run_worker, runner):
server = _make_server(log_level="ERROR")
app = _build_cli(server)
result = runner.invoke(app, ["start", "--log-level", "debug"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "DEBUG"


class TestDevCommandLogLevel:
@patch("livekit.agents.cli.cli._run_worker")
def test_default_log_level_is_debug(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(app, ["dev", "--no-reload"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "DEBUG"

@patch("livekit.agents.cli.cli._run_worker")
def test_cli_arg_overrides_default(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(app, ["dev", "--no-reload", "--log-level", "info"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "INFO"

@patch("livekit.agents.cli.cli._run_worker")
def test_env_var_overrides_default(self, mock_run_worker, runner):
server = _make_server()
app = _build_cli(server)
result = runner.invoke(app, ["dev", "--no-reload"], env={"LIVEKIT_LOG_LEVEL": "ERROR"})
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "ERROR"

@patch("livekit.agents.cli.cli._run_worker")
def test_server_opts_log_level_used_when_no_cli_or_env(self, mock_run_worker, runner):
server = _make_server(log_level="CRITICAL")
app = _build_cli(server)
result = runner.invoke(app, ["dev", "--no-reload"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "CRITICAL"


class TestLogLevelValidation:
def test_invalid_string_rejected_by_server_options(self):
with pytest.raises(ValueError, match="Invalid log level"):
ServerOptions(entrypoint_fnc=lambda ctx: None, log_level="WARNING")

def test_invalid_string_rejected_by_agent_server(self):
with pytest.raises(ValueError, match="Invalid log level"):
AgentServer(log_level="NOTSET")

def test_invalid_server_env_option_rejected(self):
with pytest.raises(ValueError, match="Invalid log level"):
ServerOptions(
entrypoint_fnc=lambda ctx: None,
log_level=ServerEnvOption(dev_default="WARNING", prod_default="INFO"),
)

def test_valid_levels_accepted_by_server_options(self):
for level in ("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"):
opts = ServerOptions(entrypoint_fnc=lambda ctx: None, log_level=level)
assert opts.log_level == level

def test_valid_levels_accepted_by_agent_server(self):
for level in ("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"):
server = AgentServer(log_level=level)
assert server.log_level == level

def test_lowercase_normalized_to_uppercase_server_options(self):
opts = ServerOptions(entrypoint_fnc=lambda ctx: None, log_level="info")
assert opts.log_level == "INFO"

def test_lowercase_normalized_to_uppercase_agent_server(self):
server = AgentServer(log_level="debug")
assert server.log_level == "DEBUG"

def test_lowercase_server_env_option_normalized(self):
opts = ServerOptions(
entrypoint_fnc=lambda ctx: None,
log_level=ServerEnvOption(dev_default="debug", prod_default="error"),
)
assert ServerEnvOption.getvalue(opts.log_level, True) == "DEBUG"
assert ServerEnvOption.getvalue(opts.log_level, False) == "ERROR"

@patch("livekit.agents.cli.cli._run_worker")
def test_lowercase_server_opts_works_through_cli(self, mock_run_worker, runner):
server = _make_server(log_level="error")
app = _build_cli(server)
result = runner.invoke(app, ["start"])
assert result.exit_code == 0
assert mock_run_worker.call_args.kwargs["args"].log_level == "ERROR"
Loading