- Core Benchmark Classes
- Metrics Classes
- Profiling and Telemetry
- Visualization
- Server Management
- Benchmark Suite
Context manager for integrating benchmarking into existing applications. Useful for production monitoring and targeted performance measurement.
Location: rpycbench.core.benchmark
Constructor:
BenchmarkContext(
name: str,
protocol: str,
server_mode: Optional[str] = None,
measure_connection: bool = False,
measure_latency: bool = True,
measure_bandwidth: bool = False,
measure_system: bool = True,
)Parameters:
name(str): Identifier for this benchmark contextprotocol(str): Protocol being measured, e.g., 'rpyc' or 'http'server_mode(Optional[str]): Server mode if applicable ('threaded', 'forking', 'oneshot')measure_connection(bool): Whether to measure connection establishment timemeasure_latency(bool): Whether to measure request latencymeasure_bandwidth(bool): Whether to measure bandwidthmeasure_system(bool): Whether to measure system resources (CPU, memory)
Methods:
Context manager for measuring connection establishment time.
with ctx.measure_connection_time():
conn = rpyc.connect('localhost', 18812)Returns: Context manager
Context manager for measuring request execution.
with ctx.measure_request(bytes_sent=1024, bytes_received=2048):
result = conn.root.some_method()Parameters:
bytes_sent(int): Number of bytes sent in this requestbytes_received(int): Number of bytes received in this request
Returns: Context manager
Record a request completion status.
ctx.record_request(success=True)Parameters:
success(bool): Whether the request succeeded
Get the collected benchmark metrics.
metrics = ctx.get_results()Returns: BenchmarkMetrics
Example:
from rpycbench.core.benchmark import BenchmarkContext
ctx = BenchmarkContext(
name="user_fetch",
protocol="rpyc",
measure_latency=True
)
with ctx.measure_request(bytes_sent=512, bytes_received=4096):
user = conn.root.get_user(123)
ctx.record_request(success=True)
metrics = ctx.get_results()
stats = metrics.compute_statistics()
print(f"Mean latency: {stats['latency']['mean']*1000:.2f}ms")Measures connection establishment performance by creating multiple connections and timing each one.
Location: rpycbench.core.benchmark
Constructor:
ConnectionBenchmark(
name: str,
protocol: str,
server_mode: Optional[str],
connection_factory: Callable[[], Any],
num_connections: int = 100,
)Parameters:
name(str): Benchmark nameprotocol(str): Protocol being tested ('rpyc', 'http', etc.)server_mode(Optional[str]): Server mode ('threaded', 'forking', 'oneshot')connection_factory(Callable): Function that creates and returns a new connectionnum_connections(int): Number of connections to establish for sampling
Methods:
Execute the benchmark.
metrics = bench.run()Returns: BenchmarkMetrics
Example:
from rpycbench.core.benchmark import ConnectionBenchmark
import rpyc
bench = ConnectionBenchmark(
name="rpyc_connection",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
num_connections=100
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Mean connection time: {stats['connection_time']['mean']*1000:.2f}ms")
print(f"P99 connection time: {stats['connection_time']['p99']*1000:.2f}ms")Measures request/response latency by executing a request function repeatedly.
Location: rpycbench.core.benchmark
Constructor:
LatencyBenchmark(
name: str,
protocol: str,
server_mode: Optional[str],
connection_factory: Callable[[], Any],
request_func: Callable[[Any], Any],
num_requests: int = 1000,
warmup_requests: int = 10,
)Parameters:
name(str): Benchmark nameprotocol(str): Protocol being testedserver_mode(Optional[str]): Server modeconnection_factory(Callable): Function that creates a connectionrequest_func(Callable): Function that takes a connection and performs a requestnum_requests(int): Number of requests to executewarmup_requests(int): Number of warmup requests to exclude from statistics
Methods:
Execute the benchmark.
Returns: BenchmarkMetrics
Example:
from rpycbench.core.benchmark import LatencyBenchmark
import rpyc
def echo_request(conn):
return conn.root.echo("test")
bench = LatencyBenchmark(
name="echo_latency",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=echo_request,
num_requests=1000,
warmup_requests=10
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Mean latency: {stats['latency']['mean']*1000:.2f}ms")
print(f"P50 latency: {stats['latency']['median']*1000:.2f}ms")
print(f"P95 latency: {stats['latency']['p95']*1000:.2f}ms")
print(f"P99 latency: {stats['latency']['p99']*1000:.2f}ms")Measures upload and download bandwidth for various data payload sizes.
Location: rpycbench.core.benchmark
Constructor:
BandwidthBenchmark(
name: str,
protocol: str,
server_mode: Optional[str],
connection_factory: Callable[[], Any],
upload_func: Callable[[Any, bytes], Any],
download_func: Callable[[Any, int], bytes],
data_sizes: Optional[List[int]] = None,
iterations: int = 10,
)Parameters:
name(str): Benchmark nameprotocol(str): Protocol being testedserver_mode(Optional[str]): Server modeconnection_factory(Callable): Function that creates a connectionupload_func(Callable): Function that takes connection and data bytes, uploads themdownload_func(Callable): Function that takes connection and size, downloads that many bytesdata_sizes(Optional[List[int]]): List of payload sizes to test. Default: [1KB, 10KB, 100KB, 1MB]iterations(int): Number of iterations per data size
Methods:
Execute the benchmark.
Returns: BenchmarkMetrics
Example:
from rpycbench.core.benchmark import BandwidthBenchmark
import rpyc
bench = BandwidthBenchmark(
name="data_transfer",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
upload_func=lambda c, data: c.root.receive_data(data),
download_func=lambda c, size: c.root.send_data(size),
data_sizes=[1024, 10*1024, 100*1024, 1024*1024],
iterations=20
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Upload bandwidth: {stats['upload_bandwidth']['mean']/1024/1024:.2f} MB/s")
print(f"Download bandwidth: {stats['download_bandwidth']['mean']/1024/1024:.2f} MB/s")Measures large file transfer performance with various chunk sizes and transfer methods.
Location: rpycbench.core.benchmark
Constructor:
BinaryTransferBenchmark(
name: str,
protocol: str,
server_mode: Optional[str],
connection_factory: Callable[[], Any],
upload_func: Callable[[Any, bytes], Any],
download_func: Callable[[Any, int], bytes],
upload_chunked_func: Optional[Callable[[Any, List[bytes]], Any]] = None,
download_chunked_func: Optional[Callable[[Any, int, int], List[bytes]]] = None,
file_sizes: Optional[List[int]] = None,
chunk_size: Optional[int] = None,
iterations: int = 3,
test_upload: bool = True,
test_download: bool = True,
test_chunked: bool = True,
)Parameters:
name(str): Benchmark nameprotocol(str): Protocol being testedserver_mode(Optional[str]): Server modeconnection_factory(Callable): Function that creates a connectionupload_func(Callable): Function for uploading entire file at oncedownload_func(Callable): Function for downloading entire file at onceupload_chunked_func(Optional[Callable]): Function for uploading file in chunksdownload_chunked_func(Optional[Callable]): Function for downloading file in chunksfile_sizes(Optional[List[int]]): File sizes to test. Default: [1.5MB, 128MB, 500MB]chunk_size(Optional[int]): Chunk size for chunked transfers. Default: 64KBiterations(int): Number of iterations per file sizetest_upload(bool): Whether to test uploadstest_download(bool): Whether to test downloadstest_chunked(bool): Whether to test chunked transfers
Methods:
Execute the benchmark.
Returns: BenchmarkMetrics
Example:
from rpycbench.core.benchmark import BinaryTransferBenchmark
import rpyc
bench = BinaryTransferBenchmark(
name="file_transfer",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
upload_func=lambda c, data: c.root.upload_file(data),
download_func=lambda c, size: c.root.download_file(size),
upload_chunked_func=lambda c, chunks: c.root.upload_chunked(chunks),
download_chunked_func=lambda c, size, cs: c.root.download_chunked(size, cs),
file_sizes=[10*1024*1024, 100*1024*1024], # 10MB, 100MB
chunk_size=64*1024,
iterations=3
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Upload: {stats['upload_bandwidth']['mean']/1024/1024:.2f} MB/s")
print(f"Download: {stats['download_bandwidth']['mean']/1024/1024:.2f} MB/s")Measures performance under concurrent load by simulating multiple clients.
Location: rpycbench.core.benchmark
Constructor:
ConcurrentBenchmark(
name: str,
protocol: str,
server_mode: Optional[str],
connection_factory: Callable[[], Any],
request_func: Callable[[Any], Any],
num_clients: int = 128,
requests_per_client: int = 100,
max_workers: Optional[int] = None,
track_per_connection: bool = False,
)Parameters:
name(str): Benchmark nameprotocol(str): Protocol being testedserver_mode(Optional[str]): Server modeconnection_factory(Callable): Function that creates a connectionrequest_func(Callable): Function that performs a requestnum_clients(int): Number of concurrent clients to simulaterequests_per_client(int): Number of requests each client should makemax_workers(Optional[int]): Maximum thread pool size. Default: num_clientstrack_per_connection(bool): Whether to track metrics per connection (higher memory usage)
Methods:
Execute the benchmark.
Returns: BenchmarkMetrics
Example:
from rpycbench.core.benchmark import ConcurrentBenchmark
import rpyc
bench = ConcurrentBenchmark(
name="concurrent_load",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=lambda c: c.root.process_request(),
num_clients=128,
requests_per_client=100,
track_per_connection=False
)
metrics = bench.run()
stats = metrics.compute_statistics()
success_rate = (1 - metrics.failed_requests / metrics.total_requests) * 100
print(f"Success rate: {success_rate:.1f}%")
print(f"Mean latency: {stats['latency']['mean']*1000:.2f}ms")
print(f"P99 latency: {stats['latency']['p99']*1000:.2f}ms")
print(f"Failed requests: {metrics.failed_requests}/{metrics.total_requests}")Container for benchmark measurements and statistics.
Location: rpycbench.core.metrics
Constructor:
@dataclass
class BenchmarkMetrics:
name: str
protocol: str
server_mode: Optional[str]
connection_times: List[float]
latencies: List[float]
upload_bandwidth: List[float]
download_bandwidth: List[float]
concurrent_connections: int
total_requests: int
failed_requests: int
cpu_usage: List[float]
memory_usage: List[float]
metadata: Dict[str, Any]Attributes:
name(str): Benchmark nameprotocol(str): Protocol testedserver_mode(Optional[str]): Server modeconnection_times(List[float]): Connection establishment times in secondslatencies(List[float]): Request latencies in secondsupload_bandwidth(List[float]): Upload bandwidth measurements in bytes/seconddownload_bandwidth(List[float]): Download bandwidth measurements in bytes/secondconcurrent_connections(int): Number of concurrent connectionstotal_requests(int): Total number of requests attemptedfailed_requests(int): Number of failed requestscpu_usage(List[float]): CPU usage percentagesmemory_usage(List[float]): Memory usage in bytesmetadata(Dict[str, Any]): Additional metadata
Methods:
Record a connection establishment time.
Parameters:
duration(float): Time in seconds
Record a request latency.
Parameters:
duration(float): Time in seconds
Record an upload operation.
Parameters:
bytes_sent(int): Number of bytes sentduration(float): Time taken in seconds
Record a download operation.
Parameters:
bytes_received(int): Number of bytes receivedduration(float): Time taken in seconds
Capture current CPU and memory usage.
Compute comprehensive statistics from collected metrics.
Returns: Dict with keys:
'connection_time': Dict with 'mean', 'median', 'min', 'max', 'stdev', 'p95', 'p99''latency': Dict with 'mean', 'median', 'min', 'max', 'stdev', 'p95', 'p99''upload_bandwidth': Dict with 'mean', 'median', 'min', 'max', 'stdev', 'p95', 'p99''download_bandwidth': Dict with 'mean', 'median', 'min', 'max', 'stdev', 'p95', 'p99''cpu_usage': Dict with 'mean', 'median', 'min', 'max''memory_usage': Dict with 'mean', 'median', 'min', 'max'
Example:
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Latency mean: {stats['latency']['mean']*1000:.2f}ms")
print(f"Latency median: {stats['latency']['median']*1000:.2f}ms")
print(f"Latency P95: {stats['latency']['p95']*1000:.2f}ms")
print(f"Latency P99: {stats['latency']['p99']*1000:.2f}ms")
print(f"Latency stdev: {stats['latency']['stdev']*1000:.2f}ms")Interpreting Metrics:
Understanding what metric values mean helps diagnose performance issues:
| Metric | Normal Range | Warning Signs | Diagnosis |
|---|---|---|---|
| CPU Usage | |||
cpu_usage['mean'] |
30-70% | > 80% | High CPU - check if GIL-bound (threaded mode) |
| < 10% | Server idle or I/O-bound | ||
| Latency | |||
latency['p99'] / latency['median'] |
< 3x | > 5x | Inconsistent performance - resource contention |
| > 10x | Severe contention - GIL or I/O limits | ||
latency['stdev'] |
Low relative to mean | High stdev | High variance indicates instability |
| Success Rate | |||
(1 - failed/total) * 100 |
> 99% | 95-99% | Approaching limits |
| < 95% | Server exhaustion - increase ulimits | ||
| Memory Usage | |||
memory_usage['mean'] |
Stable | Growing | Memory leak or unbounded caching |
Common Patterns:
stats = metrics.compute_statistics()
success_rate = (1 - metrics.failed_requests / metrics.total_requests) * 100
# Pattern 1: GIL Contention (CPU-bound threaded mode)
if stats['cpu_usage']['mean'] > 80 and stats['latency']['p99'] / stats['latency']['median'] > 5:
print("DIAGNOSIS: GIL contention in threaded mode")
print("ACTION: Switch to server_mode='forking' for CPU-bound workload")
# Pattern 2: I/O Bottleneck
if stats['cpu_usage']['mean'] < 30 and stats['latency']['p99'] > 0.1:
print("DIAGNOSIS: I/O bottleneck - low CPU but high latency")
print("ACTION: Optimize database queries, add connection pooling")
# Pattern 3: Server Exhaustion
if success_rate < 95:
print("DIAGNOSIS: Server dropping connections - exhausted")
print("ACTION: Increase ulimits (ulimit -n 65536), reduce load")
# Pattern 4: Consistent Performance
if stats['latency']['p99'] / stats['latency']['median'] < 3:
print("DIAGNOSIS: Consistent performance - well-behaved system")
if stats['latency']['mean'] > target_latency:
print("ACTION: Latency is consistent but too high - optimize application logic")Red Flags:
- High CPU (>80%) + High P99/P50 ratio (>5): GIL contention - switch to forking mode
- Low CPU (<30%) + High P99 (>100ms): I/O bottleneck - optimize I/O operations
- Success rate < 95%: Server exhaustion - increase limits or reduce concurrency
- High memory growth: Potential memory leak - investigate caching and object lifecycle
- P99 > 10x P50: Severe resource contention - system under stress
Convert metrics to dictionary representation.
Returns: Dict
Convert metrics to JSON string.
Returns: str
Collection of multiple benchmark results for comparison.
Location: rpycbench.core.metrics
Constructor:
@dataclass
class BenchmarkResults:
results: List[BenchmarkMetrics] = field(default_factory=list)Methods:
Add a benchmark result to the collection.
Parameters:
metrics(BenchmarkMetrics): Metrics to add
Get a comparison table of all results.
Returns: Dict mapping benchmark names to their statistics
Example:
results = BenchmarkResults()
results.add_result(rpyc_metrics)
results.add_result(http_metrics)
comparison = results.get_comparison_table()
print(f"RPyC latency: {comparison['rpyc']['latency_mean']*1000:.2f}ms")
print(f"HTTP latency: {comparison['http']['latency_mean']*1000:.2f}ms")Export all results as JSON string.
Returns: str
Export all results as dictionary.
Returns: Dict
Print a human-readable summary of all results.
Example:
results = BenchmarkResults()
results.add_result(rpyc_threaded_metrics)
results.add_result(rpyc_forking_metrics)
results.add_result(http_metrics)
results.print_summary()Tracks RPyC operations for profiling and performance analysis.
Location: rpycbench.utils.telemetry
Constructor:
RPyCTelemetry(
enabled: bool = True,
track_netrefs: bool = True,
track_stacks: bool = True,
slow_call_threshold: float = 0.1,
deep_stack_threshold: int = 5,
)Parameters:
enabled(bool): Whether telemetry is enabledtrack_netrefs(bool): Whether to track NetRef creation and lifecycletrack_stacks(bool): Whether to track call stacksslow_call_threshold(float): Threshold in seconds for flagging slow callsdeep_stack_threshold(int): Threshold for flagging deep call stacks
Methods:
Start tracking a remote call.
Parameters:
method_name(str): Name of the method being calledcall_type(str): Type of call ('method', 'getattr', 'setattr', etc.)is_netref(bool): Whether the target is a NetRefnetref_id(Optional[int]): NetRef ID if applicable
Returns: int (call_id)
end_call(call_id: int, result_is_netref: bool = False, result_netref_id: Optional[int] = None, exception: Optional[Exception] = None)
End tracking a remote call.
Parameters:
call_id(int): Call ID returned from start_callresult_is_netref(bool): Whether result is a NetRefresult_netref_id(Optional[int]): NetRef ID of result if applicableexception(Optional[Exception]): Exception if call failed
Register a NetRef object for tracking.
Parameters:
netref_obj(Any): The NetRef objectcreated_by_call_id(Optional[int]): Call ID that created this NetRef
Returns: int (netref_id)
Get current call stack depth.
Returns: int
Get current call stack.
Returns: List[RPyCCallInfo]
Print current call stack to stdout.
Parameters:
title(str): Title for the stack trace
Get telemetry statistics.
Returns: Dict with keys:
'total_calls': int'slow_calls': int'netrefs_created': int'netrefs_accessed': int'max_stack_depth': int'total_duration': float
Print comprehensive telemetry summary to stdout.
Clear all telemetry data.
Example:
from rpycbench.utils.telemetry import RPyCTelemetry
telemetry = RPyCTelemetry(
slow_call_threshold=0.05, # 50ms
deep_stack_threshold=3,
track_netrefs=True,
track_stacks=True
)
# Use with ProfiledConnection...
# ... after operations ...
telemetry.print_summary()
stats = telemetry.get_statistics()
print(f"Total calls: {stats['total_calls']}")
print(f"Slow calls: {stats['slow_calls']}")
print(f"NetRefs created: {stats['netrefs_created']}")Wrapper around RPyC connection that tracks all operations.
Location: rpycbench.utils.profiler
Constructor:
ProfiledConnection(
connection: rpyc.Connection,
telemetry_inst: Optional[RPyCTelemetry] = None,
auto_print_on_slow: bool = True,
auto_print_on_deep: bool = True,
)Parameters:
connection(rpyc.Connection): The RPyC connection to wraptelemetry_inst(Optional[RPyCTelemetry]): Telemetry instance to use. Creates new one if Noneauto_print_on_slow(bool): Automatically print call stack when slow calls detectedauto_print_on_deep(bool): Automatically print call stack when deep nesting detected
Properties:
Access the root service object (returns ProfiledNetRef).
profiled_conn.root.some_method() # TrackedAccess the underlying telemetry instance.
stats = profiled_conn.telemetry.get_statistics()Methods:
Close the underlying connection.
Example:
from rpycbench.utils.profiler import ProfiledConnection
from rpycbench.utils.telemetry import RPyCTelemetry
import rpyc
telemetry = RPyCTelemetry(slow_call_threshold=0.1)
conn = rpyc.connect('localhost', 18812)
profiled = ProfiledConnection(conn, telemetry_inst=telemetry)
result = profiled.root.some_method()
telemetry.print_summary()
profiled.close()Factory function for creating a profiled RPyC connection.
Location: rpycbench.utils.profiler
Signature:
create_profiled_connection(
host: str = 'localhost',
port: int = 18812,
telemetry_inst: Optional[RPyCTelemetry] = None,
auto_print_on_slow: bool = False,
auto_print_on_deep: bool = False,
**rpyc_config
) -> ProfiledConnectionParameters:
host(str): Server hostnameport(int): Server porttelemetry_inst(Optional[RPyCTelemetry]): Telemetry instance to useauto_print_on_slow(bool): Auto-print on slow callsauto_print_on_deep(bool): Auto-print on deep stacks**rpyc_config: Additional RPyC configuration options
Returns: ProfiledConnection
Example:
from rpycbench.utils.profiler import create_profiled_connection
conn = create_profiled_connection(
host='localhost',
port=18812,
auto_print_on_slow=True
)
result = conn.root.some_method()
conn.telemetry.print_summary()
conn.close()Context manager for profiling RPyC operations.
Location: rpycbench.utils.profiler
Signature:
profile_rpyc_calls(
connection: rpyc.Connection,
print_summary: bool = False,
**telemetry_kwargs
) -> ProfiledConnectionParameters:
connection(rpyc.Connection): Existing RPyC connection to profileprint_summary(bool): Whether to automatically print summary on exit**telemetry_kwargs: Arguments to pass to RPyCTelemetry constructor
Returns: ProfiledConnection (as context manager)
Example:
from rpycbench.utils.profiler import profile_rpyc_calls
import rpyc
conn = rpyc.connect('localhost', 18812)
with profile_rpyc_calls(conn, print_summary=True, slow_call_threshold=0.05) as profiled:
result = profiled.root.some_method()
# Summary printed automatically on exitLocation: rpycbench.utils.telemetry
Get the global telemetry instance.
from rpycbench.utils.telemetry import get_telemetry
telemetry = get_telemetry()
stats = telemetry.get_statistics()Returns: RPyCTelemetry
Enable global telemetry with specified settings.
from rpycbench.utils.telemetry import enable_telemetry
enable_telemetry(
slow_call_threshold=0.1,
track_netrefs=True,
track_stacks=True
)Parameters: Same as RPyCTelemetry constructor
Returns: RPyCTelemetry
Disable global telemetry.
from rpycbench.utils.telemetry import disable_telemetry
disable_telemetry()Context manager for temporary telemetry.
from rpycbench.utils.telemetry import telemetry_context
with telemetry_context(slow_call_threshold=0.05) as telemetry:
# Operations tracked
pass
# Telemetry disabled after contextParameters: Same as RPyCTelemetry constructor
Returns: Context manager yielding RPyCTelemetry
Format call history as a tree structure.
Location: rpycbench.utils.visualizer
Signature:
format_call_tree(
telemetry: RPyCTelemetry,
max_depth: Optional[int] = None,
min_duration: float = 0.0,
show_netrefs: bool = True,
) -> strParameters:
telemetry(RPyCTelemetry): Telemetry instance with call historymax_depth(Optional[int]): Maximum depth to display. None for unlimitedmin_duration(float): Minimum call duration to include (seconds)show_netrefs(bool): Whether to show NetRef information
Returns: str (formatted tree)
Example:
from rpycbench.utils.visualizer import format_call_tree
tree = format_call_tree(
telemetry,
max_depth=5,
min_duration=0.01,
show_netrefs=True
)
print(tree)Format call history as a timeline.
Location: rpycbench.utils.visualizer
Signature:
format_timeline(
telemetry: RPyCTelemetry,
width: int = 80,
min_duration: float = 0.0,
) -> strParameters:
telemetry(RPyCTelemetry): Telemetry instancewidth(int): Width of timeline in charactersmin_duration(float): Minimum duration to include
Returns: str (formatted timeline)
Example:
from rpycbench.utils.visualizer import format_timeline
timeline = format_timeline(telemetry, width=100)
print(timeline)Generate NetRef usage report.
Location: rpycbench.utils.visualizer
Signature:
format_netref_report(telemetry: RPyCTelemetry) -> strParameters:
telemetry(RPyCTelemetry): Telemetry instance
Returns: str (formatted report)
Example:
from rpycbench.utils.visualizer import format_netref_report
report = format_netref_report(telemetry)
print(report)Generate report of slow calls sorted by duration.
Location: rpycbench.utils.visualizer
Signature:
format_slow_calls_report(
telemetry: RPyCTelemetry,
top_n: int = 20,
) -> strParameters:
telemetry(RPyCTelemetry): Telemetry instancetop_n(int): Number of top slow calls to include
Returns: str (formatted report)
Example:
from rpycbench.utils.visualizer import format_slow_calls_report
report = format_slow_calls_report(telemetry, top_n=10)
print(report)Generate comprehensive profiling report.
Location: rpycbench.utils.visualizer
Signature:
format_full_report(
telemetry: RPyCTelemetry,
include_tree: bool = True,
include_timeline: bool = False,
include_netrefs: bool = True,
include_slow_calls: bool = True,
) -> strParameters:
telemetry(RPyCTelemetry): Telemetry instanceinclude_tree(bool): Include call treeinclude_timeline(bool): Include timelineinclude_netrefs(bool): Include NetRef reportinclude_slow_calls(bool): Include slow calls report
Returns: str (formatted report)
Example:
from rpycbench.utils.visualizer import format_full_report
report = format_full_report(
telemetry,
include_tree=True,
include_timeline=False,
include_netrefs=True,
include_slow_calls=True
)
print(report)Manages local RPyC server lifecycle.
Location: rpycbench.servers.rpyc_servers
Constructor:
RPyCServer(
host: str = 'localhost',
port: int = 18812,
mode: str = 'threaded',
auto_register: bool = False
)Parameters:
host(str): Host to bind toport(int): Port to bind tomode(str): Server mode ('threaded', 'forking', 'oneshot')auto_register(bool): Whether to auto-register with RPyC registry
Methods:
Start the server in a separate process.
Stop the server.
Example:
from rpycbench.servers.rpyc_servers import RPyCServer
# As context manager
with RPyCServer(port=18812, mode='threaded') as server:
# Server running
# ... run tests ...
pass
# Server stopped
# Or manually
server = RPyCServer(port=18812, mode='threaded')
server.start()
# ... run tests ...
server.stop()Manages RPyC server on remote host via SSH.
Location: rpycbench.remote.ssh
Constructor:
RemoteRPyCServer(
remote_host: str,
host: str = 'localhost',
port: int = 18812,
mode: str = 'threaded',
ssh_port: int = 22,
verbose: bool = True,
)Parameters:
remote_host(str): Remote host in format 'user@hostname'host(str): Host for RPyC server to bind to on remote machineport(int): Port for RPyC servermode(str): Server mode ('threaded', 'forking', 'oneshot')ssh_port(int): SSH portverbose(bool): Whether to print deployment progress
Methods:
Deploy code to remote host and start server.
Automatically:
- Creates deployment archive
- Uploads via SSH
- Installs dependencies on remote host
- Starts server process
- Caches deployment (only redeploys if code changed)
Stop remote server.
Example:
from rpycbench.remote.ssh import RemoteRPyCServer
# As context manager
with RemoteRPyCServer(
remote_host='user@remote-server.example.com',
port=18812,
mode='threaded'
) as server:
# Server deployed and running on remote host
# Connect from local machine:
conn = rpyc.connect('remote-server.example.com', 18812)
# ... run tests ...
conn.close()
# Server stopped and cleaned up
# Or manually
server = RemoteRPyCServer(remote_host='user@remote.com', port=18812)
server.start()
# ... run tests ...
server.stop()Factory for creating RPyC connections with standard configuration.
Location: rpycbench.utils.connection
Signature:
create_rpyc_connection(
host: str = 'localhost',
port: int = 18812,
timeout: int = 5
) -> rpyc.ConnectionParameters:
host(str): Server hostnameport(int): Server porttimeout(int): Connection timeout in seconds
Returns: rpyc.Connection
Example:
from rpycbench.utils.connection import create_rpyc_connection
conn = create_rpyc_connection('localhost', 18812, timeout=10)
result = conn.root.some_method()
conn.close()Orchestrates complete benchmark suite comparing RPyC vs HTTP across multiple dimensions.
Location: rpycbench.benchmarks.suite
Constructor:
BenchmarkSuite(
rpyc_host: str = 'localhost',
rpyc_port: int = 18812,
http_host: str = 'localhost',
http_port: int = 5000,
remote_host: Optional[str] = None,
)Parameters:
rpyc_host(str): RPyC server hostnamerpyc_port(int): RPyC server porthttp_host(str): HTTP server hostnamehttp_port(int): HTTP server portremote_host(Optional[str]): Remote host for SSH deployment ('user@hostname')
Methods:
Run complete benchmark suite.
run_all(
test_rpyc_threaded: bool = True,
test_rpyc_forking: bool = False,
test_http: bool = False,
num_serial_connections: int = 100,
num_requests: int = 1000,
num_parallel_clients: int = 128,
requests_per_client: int = 100,
test_binary_transfer: bool = False,
binary_file_sizes: Optional[List[int]] = None,
binary_chunk_size: int = 64 * 1024,
binary_iterations: int = 3,
) -> BenchmarkResultsParameters:
test_rpyc_threaded(bool): Test RPyC in threaded modetest_rpyc_forking(bool): Test RPyC in forking modetest_http(bool): Test HTTP/RESTnum_serial_connections(int): Sample size for connection benchmarksnum_requests(int): Sample size for latency benchmarksnum_parallel_clients(int): Number of concurrent clientsrequests_per_client(int): Requests per concurrent clienttest_binary_transfer(bool): Whether to test large file transfersbinary_file_sizes(Optional[List[int]]): File sizes for binary transfer testsbinary_chunk_size(int): Chunk size for binary transfersbinary_iterations(int): Iterations for binary transfer tests
Returns: BenchmarkResults
Example:
from rpycbench.benchmarks.suite import BenchmarkSuite
# Local testing
suite = BenchmarkSuite(
rpyc_host='localhost',
rpyc_port=18812,
http_host='localhost',
http_port=5000
)
results = suite.run_all(
test_rpyc_threaded=True,
test_rpyc_forking=True,
test_http=True,
num_requests=1000,
num_parallel_clients=128
)
results.print_summary()
# Remote testing
remote_suite = BenchmarkSuite(
rpyc_host='remote.example.com',
rpyc_port=18812,
remote_host='user@remote.example.com'
)
remote_results = remote_suite.run_all(
test_rpyc_threaded=True,
num_requests=1000
)
remote_results.print_summary()