Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 154 additions & 4 deletions src/ghstack/logs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python3

from __future__ import annotations

import contextlib
import datetime
import functools
Expand All @@ -9,8 +11,9 @@
import shutil
import subprocess
import sys
import time
import uuid
from typing import Dict, Iterator, Optional
from typing import Any, Dict, Iterator, List, Optional

DATETIME_FORMAT = "%Y-%m-%d_%Hh%Mm%Ss"

Expand Down Expand Up @@ -60,6 +63,129 @@ def redact(self, needle: str, replace: str = "<REDACTED>") -> None:
formatter = Formatter(fmt="%(levelname)s: %(message)s", datefmt="")


class HandlerMetrics:
def __init__(self, name: str) -> None:
self.name = name
self.records = 0
self.bytes = 0
self.max_record_bytes = 0
self.emit_seconds = 0.0
self.format_seconds = 0.0
self.write_seconds = 0.0
self.flush_seconds = 0.0

def record(
self,
*,
record_bytes: int,
emit_seconds: float,
format_seconds: float,
write_seconds: float,
flush_seconds: float,
) -> None:
self.records += 1
self.bytes += record_bytes
self.max_record_bytes = max(self.max_record_bytes, record_bytes)
self.emit_seconds += emit_seconds
self.format_seconds += format_seconds
self.write_seconds += write_seconds
self.flush_seconds += flush_seconds


def _log_metrics_enabled() -> bool:
return os.environ.get("GHSTACK_LOG_METRICS", "").lower() in (
"1",
"true",
"yes",
"on",
)


def _encoded_len(stream: Any, s: str) -> int:
encoding = getattr(stream, "encoding", None) or "utf-8"
return len(s.encode(encoding, errors="backslashreplace"))


def _emit_with_metrics(
handler: logging.StreamHandler[Any],
record: logging.LogRecord,
metrics: HandlerMetrics,
) -> None:
emit_start = time.perf_counter()
try:
format_start = time.perf_counter()
msg = handler.format(record)
format_seconds = time.perf_counter() - format_start

stream = handler.stream
output = msg + handler.terminator
record_bytes = _encoded_len(stream, output)

write_start = time.perf_counter()
stream.write(output)
write_seconds = time.perf_counter() - write_start

flush_start = time.perf_counter()
handler.flush()
flush_seconds = time.perf_counter() - flush_start

emit_seconds = time.perf_counter() - emit_start
metrics.record(
record_bytes=record_bytes,
emit_seconds=emit_seconds,
format_seconds=format_seconds,
write_seconds=write_seconds,
flush_seconds=flush_seconds,
)
except RecursionError:
raise
except Exception:
handler.handleError(record)


class MetricStreamHandler(logging.StreamHandler): # type: ignore[type-arg]
def __init__(self, metrics: HandlerMetrics) -> None:
super().__init__()
self.metrics = metrics

def emit(self, record: logging.LogRecord) -> None:
_emit_with_metrics(self, record, self.metrics)


class MetricFileHandler(logging.FileHandler):
def __init__(self, filename: str, metrics: HandlerMetrics) -> None:
super().__init__(filename)
self.metrics = metrics

def emit(self, record: logging.LogRecord) -> None:
_emit_with_metrics(self, record, self.metrics)


def _report_metrics(metrics: List[HandlerMetrics], log_file: str) -> None:
for metric in metrics:
sys.stderr.write(
"[ghstack logging] {name}: records={records} bytes={bytes} "
"max_record_bytes={max_record_bytes} emit={emit:.1f}ms "
"format={format:.1f}ms write={write:.1f}ms flush={flush:.1f}ms\n".format(
name=metric.name,
records=metric.records,
bytes=metric.bytes,
max_record_bytes=metric.max_record_bytes,
emit=metric.emit_seconds * 1000,
format=metric.format_seconds * 1000,
write=metric.write_seconds * 1000,
flush=metric.flush_seconds * 1000,
)
)
try:
log_size = os.path.getsize(log_file)
except OSError:
return
sys.stderr.write(
"[ghstack logging] file_size={} path={}\n".format(log_size, log_file)
)


@contextlib.contextmanager
def manager(*, debug: bool = False) -> Iterator[None]:
# TCB code to setup logging. If a failure starts here we won't
Expand All @@ -70,8 +196,17 @@ def manager(*, debug: bool = False) -> Iterator[None]:
# stderr (INFO) and file handler (DEBUG).
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)

console_handler = logging.StreamHandler()
metrics: List[HandlerMetrics] = []
log_metrics = _log_metrics_enabled()

if log_metrics:
console_metrics = HandlerMetrics("console")
metrics.append(console_metrics)
console_handler: logging.StreamHandler[Any] = MetricStreamHandler(
console_metrics
)
else:
console_handler = logging.StreamHandler()
if debug:
console_handler.setLevel(logging.DEBUG)
else:
Expand All @@ -81,7 +216,12 @@ def manager(*, debug: bool = False) -> Iterator[None]:

log_file = os.path.join(run_dir(), "ghstack.log")

file_handler = logging.FileHandler(log_file)
if log_metrics:
file_metrics = HandlerMetrics("file")
metrics.append(file_metrics)
file_handler: logging.FileHandler = MetricFileHandler(log_file, file_metrics)
else:
file_handler = logging.FileHandler(log_file)
# TODO: Hypothetically, it is better if we log the timestamp.
# But I personally feel the timestamps gunk up the log info
# for not much benefit (since we're not really going to be
Expand Down Expand Up @@ -109,6 +249,16 @@ def manager(*, debug: bool = False) -> Iterator[None]:
record_exception(e)
sys.exit(1)

finally:
console_handler.flush()
file_handler.flush()
if log_metrics:
_report_metrics(metrics, log_file)
root_logger.removeHandler(console_handler)
root_logger.removeHandler(file_handler)
console_handler.close()
file_handler.close()


@functools.lru_cache()
def base_dir() -> str:
Expand Down
Loading