Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ __pycache__/
.variables
.userpersistency
build
logging
logs_scorep_jupyter/
**/*.egg-info
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ To see the detailed report for marshalling steps - `SCOREP_JUPYTER_MARSHALLING_D
%env SCOREP_JUPYTER_MARSHALLING_DETAILED_REPORT=1
```
You can disable visual animations shown during long-running tasks by setting the `SCOREP_JUPYTER_DISABLE_PROCESSING_ANIMATIONS` environment variable.
This can be useful for debugging, as it ensures that any error messages from your code in cells are shown without being overwritten.
It is also helpful when running code that produces its own progress bars (e.g., using `tqdm`), to prevent output from being obscured.
```
%env SCOREP_JUPYTER_DISABLE_PROCESSING_ANIMATIONS=1
```
Expand Down Expand Up @@ -205,9 +207,21 @@ When dealing with big data structures, there might be a big runtime overhead at

## Logging Configuration
To adjust logging and obtain more detailed output about the behavior of the scorep_jupyter kernel, refer to the `src/logging_config.py` file.

This file contains configuration options for controlling the verbosity, format, and destination of log messages. You can customize it to suit your debugging needs.

Log files are stored in the following directory:
```
scorep_jupyter_kernel_python/
├── logs_scorep_jupyter/
│ ├── debug.log
│ ├── info.log
└── └── error.log
```
In some cases, you may want to suppress tqdm messages that are saved to error.log (since tqdm outputs to stderr). This can be done using the following environment variable:
```
%env TQDM_DISABLE=1
```

# Future Work

The kernel is still under development.
Expand Down
237 changes: 143 additions & 94 deletions src/scorep_jupyter/kernel.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import datetime
import importlib
import logging.config
import os
import re
import selectors
import shutil
import subprocess
import sys
import threading
import time
import shutil
import logging.config

from enum import Enum
from textwrap import dedent
from typing import IO, AnyStr, Callable, List, TextIO

from ipykernel.ipkernel import IPythonKernel
from scorep_jupyter.userpersistence import PersHelper, scorep_script_name
from scorep_jupyter.userpersistence import magics_cleanup, create_busy_spinner
import importlib

from scorep_jupyter.kernel_messages import (
KernelErrorCode,
KERNEL_ERROR_MESSAGES,
get_scorep_process_error_hint,
)
from scorep_jupyter.userpersistence import PersHelper, scorep_script_name
from scorep_jupyter.userpersistence import magics_cleanup, create_busy_spinner
from .logging_config import LOGGING

# import scorep_jupyter.multinode_monitor.slurm_monitor as slurm_monitor

from .logging_config import LOGGING

PYTHON_EXECUTABLE = sys.executable
userpersistence_token = "scorep_jupyter.userpersistence"
jupyter_dump = "jupyter_dump.pkl"
Expand Down Expand Up @@ -234,7 +235,6 @@ def append_multicellmode(self, code):
f"print('Executing cell {self.multicell_cellcount}')\n"
+ f"print('''{code}''')\n"
+ f"print('-' * {max_line_len})\n"
+ "print('MCM_TS'+str(time.time()))\n"
+ f"{code}\n"
+ "print('''\n''')\n"
)
Expand Down Expand Up @@ -459,6 +459,7 @@ async def scorep_execute(
allow_stdin=False,
*,
cell_id=None,
is_multicell_final=False,
):
"""
Execute given code with Score-P Python bindings instrumentation.
Expand Down Expand Up @@ -563,40 +564,17 @@ async def scorep_execute(
self.pershelper.postprocess()
return reply_status_dump

# Empty cell output, required for interactive output
# e.g. tqdm for-loop progress bar
self.cell_output("\0")

stdout_lock = threading.Lock()
process_busy_spinner = create_busy_spinner(stdout_lock)
process_busy_spinner.start("Process is running...")
self.start_reading_scorep_process_streams(proc, is_multicell_final)

# Due to splitting into scorep-kernel and ipython extension,
# multicell mode is not supported for coarse-grained measurements
# anymore (in the extension) and we do not show the single cells in
# the ipython extension visualizations after executing them with scorep
# however, since we are using scorep anyway, the ipython extension is
# not useful, since we can count hardware counters anyway
# multicellmode_timestamps = []

try:
# multicellmode_timestamps =
self.read_scorep_process_pipe(proc, stdout_lock)
process_busy_spinner.stop("Done.")
except KeyboardInterrupt:
process_busy_spinner.stop("Kernel interrupted.")
if proc.poll():
self.pershelper.postprocess()
self.log_error(
KernelErrorCode.PERSISTENCE_LOAD_FAIL,
direction="Score-P -> Jupyter",
optional_hint = get_scorep_process_error_hint()
)
return self.standard_reply()

# In disk mode, subprocess already terminated
# after dumping persistence to file
if self.pershelper.mode == "disk":
if proc.returncode:
self.pershelper.postprocess()
self.cell_output(
"KernelError: Cell execution failed, cell persistence "
"was not recorded.",
"stderr",
)
return self.standard_reply()
# Ghost cell - load subprocess persistence back to Jupyter notebook
# Run in a "silent" way to not increase cells counter
reply_status_update = await super().do_execute(
Expand All @@ -607,25 +585,16 @@ async def scorep_execute(
allow_stdin=allow_stdin,
cell_id=cell_id,
)

if reply_status_update["status"] != "ok":
self.log_error(
KernelErrorCode.PERSISTENCE_LOAD_FAIL,
direction="Score-P -> Jupyter",
optional_hint = get_scorep_process_error_hint()
)
self.pershelper.postprocess()
return reply_status_update

# In memory mode, subprocess terminates once jupyter_update is
# executed and pipe is closed
if self.pershelper.mode == "memory":
if proc.poll():
self.pershelper.postprocess()
self.log_error(
KernelErrorCode.PERSISTENCE_LOAD_FAIL,
direction="Score-P -> Jupyter",
)
return self.standard_reply()

# Determine directory to which trace files were saved by Score-P
scorep_folder = ""
if "SCOREP_EXPERIMENT_DIRECTORY" in os.environ:
Expand Down Expand Up @@ -669,66 +638,145 @@ async def scorep_execute(
self.pershelper.postprocess()
return self.standard_reply()

def read_scorep_process_pipe(
self, proc: subprocess.Popen[bytes], stdout_lock: threading.Lock
) -> list:
def start_reading_scorep_process_streams(
self,
proc: subprocess.Popen[bytes],
is_multicell_final: bool,
):
"""
This function reads stdout and stderr of the subprocess running with
Score-P instrumentation independently.
It logs all stderr output, collects lines containing
the marker "MCM_TS" (used to identify multi-cell mode timestamps) into
a list, and sends the remaining
stdout lines to the Jupyter cell output.

Simultaneous access to stdout is synchronized via a lock to prevent
overlapping with another thread performing
overlapping with stderr reading thread and thread performing
long-running process animation.

Args:
proc (subprocess.Popen[bytes]): The subprocess whose output is
being read.
stdout_lock (threading.Lock): Lock to avoid output overlapping
is_multicell_final (bool): If multicell mode is finalizing -
spinner must be disabled.

Returns:
list: A list of decoded strings containing "MCM_TS" timestamps.
"""
multicellmode_timestamps = []
sel = selectors.DefaultSelector()

sel.register(proc.stdout, selectors.EVENT_READ)
sel.register(proc.stderr, selectors.EVENT_READ)
stdout_lock = threading.Lock()
spinner_stop_event = threading.Event()
process_busy_spinner = create_busy_spinner(
stdout_lock, spinner_stop_event, is_multicell_final
)

captured_stdout: List[str] = []
captured_stderr: List[str] = [] # Output parameter (return not possible from thread)
t_stderr = threading.Thread(
target=self.read_scorep_stderr,
args=(proc.stderr, stdout_lock, spinner_stop_event, captured_stderr),
)

# Empty cell output, required for interactive output
# e.g. tqdm for-loop progress bar
self.cell_output("\0")

spinner_message = "Done."

try:
process_busy_spinner.start("Process is running...")
t_stderr.start()

captured_stdout = self.read_scorep_stdout(
proc.stdout, stdout_lock, spinner_stop_event
)

except KeyboardInterrupt:
spinner_message = "Kernel interrupted."
finally:
t_stderr.join()
process_busy_spinner.stop(spinner_message)

# Handle recorded output (in case if it is suppressed by spinner animation)
self.handle_captured_output(captured_stdout, stream="stdout")
self.handle_captured_output(captured_stderr, stream="stderr")

def read_scorep_stdout(
self,
stdout: IO[AnyStr],
lock: threading.Lock,
spinner_stop_event: threading.Event,
read_chunk_size=64,
) -> List[str]:
line_width = 50
clear_line = "\r" + " " * line_width + "\r"

while True:
# Select between stdout and stderr
for key, val in sel.select():
line = key.fileobj.readline()
if not line:
sel.unregister(key.fileobj)
continue

decoded_line = line.decode(
sys.getdefaultencoding(), errors="ignore"
)
captured_stdout: List[str] = []

if key.fileobj is proc.stderr:
with stdout_lock:
self.log.warning(f"{decoded_line.strip()}")
elif "MCM_TS" in decoded_line:
multicellmode_timestamps.append(decoded_line)
else:
with stdout_lock:
sys.stdout.write(clear_line)
sys.stdout.flush()
self.cell_output(decoded_line)
def process_stdout_line(line: str):
if spinner_stop_event.is_set():
sys.stdout.write(clear_line)
sys.stdout.flush()
self.cell_output(line)
else:
captured_stdout.append(line)

# If both stdout and stderr empty -> out of loop
if not sel.get_map():
break
self.read_scorep_stream(
stdout, lock, process_stdout_line, read_chunk_size
)
return captured_stdout

def read_scorep_stderr(
self,
stderr: IO[AnyStr],
lock: threading.Lock,
spinner_stop_event: threading.Event,
captured_stderr: List[str],
read_chunk_size=64,
):

def process_stderr_line(line: str):
if spinner_stop_event.is_set():
self.log.error(line.strip())
self.cell_output(line, 'stderr')
else:
captured_stderr.append(line)

self.read_scorep_stream(
stderr, lock, process_stderr_line, read_chunk_size
)

def read_scorep_stream(
self,
stream: IO[AnyStr],
lock: threading.Lock,
process_line: Callable[[str], None],
read_chunk_size: int = 64,
):
incomplete_line = ""
endline_pattern = re.compile(r"(.*?[\r\n]|.+$)")

return multicellmode_timestamps
while True:
chunk = stream.read(read_chunk_size)
if not chunk:
break
chunk = chunk.decode(sys.getdefaultencoding(), errors="ignore")
lines = endline_pattern.findall(chunk)
if lines:
lines[0] = incomplete_line + lines[0]
if lines[-1][-1] not in ["\n", "\r"]:
incomplete_line = lines.pop(-1)
else:
incomplete_line = ""
for line in lines:
with lock:
process_line(line)

def handle_captured_output(self, output: List[str], stream: str):
if output:
text_output = "".join(output)
if stream == "stdout":
self.cell_output(text_output, stream=stream)
elif stream == "stderr":
self.cell_output(text_output, stream=stream)
self.log.error(text_output)
else:
self.log.error(f"Undefined stream type: {stream}")

async def do_execute(
self,
Expand Down Expand Up @@ -778,6 +826,7 @@ async def do_execute(
user_expressions,
allow_stdin,
cell_id=cell_id,
is_multicell_final=True,
)
except Exception:
self.cell_output(
Expand Down Expand Up @@ -879,7 +928,7 @@ def log_error(self, code: KernelErrorCode, **kwargs):
)
message = template.format(mode=mode, marshaller=marshaller, **kwargs)

self.log.error(message)
self.log.error(message.strip())
self.cell_output("KernelError: " + message, "stderr")


Expand Down
Loading