- Diagnosis and Profiling
- Benchmarking Patterns
- Application Integration
- Optimization Techniques
- Testing and Validation
Use this decision tree to systematically diagnose performance issues:
START: Poor RPyC Performance
│
├─► Check Success Rate (metrics.failed_requests / metrics.total_requests)
│ │
│ ├─► < 95% success rate?
│ │ └─► SERVER EXHAUSTION
│ │ Actions:
│ │ - Check server logs for connection errors
│ │ - Increase ulimits: ulimit -n 65536
│ │ - Reduce num_clients
│ │ - Implement connection pooling
│ │
│ └─► ≥ 95% success rate → Continue
│
├─► Measure Concurrent Performance (use ConcurrentBenchmark)
│ │
│ ├─► Calculate P99/P50 ratio
│ │ │
│ │ ├─► Ratio > 10?
│ │ │ └─► SEVERE RESOURCE CONTENTION
│ │ │ Check CPU usage → If high: GIL contention
│ │ │ Check CPU usage → If low: I/O bottleneck
│ │ │
│ │ └─► Ratio < 5 → Consistent performance, check absolute latency
│ │
│ └─► Check CPU Usage (stats['cpu_usage']['mean'])
│ │
│ ├─► CPU > 80%?
│ │ └─► HIGH CPU USAGE
│ │ │
│ │ ├─► P99/P50 > 5?
│ │ │ └─► GIL CONTENTION (threaded mode)
│ │ │ Actions:
│ │ │ - Switch to server_mode='forking'
│ │ │ - Verify CPU-bound workload (hashing, computation)
│ │ │ - Expected: Multi-core CPU usage in forking mode
│ │ │
│ │ └─► P99/P50 < 3
│ │ └─► SERVER AT CAPACITY (but stable)
│ │ Actions:
│ │ - Scale horizontally (more servers)
│ │ - Optimize hot code paths
│ │ - Profile CPU-intensive operations
│ │
│ └─► CPU < 30%?
│ └─► LOW CPU USAGE
│ │
│ ├─► P99 > 100ms?
│ │ └─► I/O BOTTLENECK
│ │ Actions:
│ │ - Profile database queries (add indexes, optimize)
│ │ - Check network latency to backend services
│ │ - Implement connection pooling
│ │ - Consider async I/O
│ │ - Threaded mode is fine (GIL released during I/O)
│ │
│ └─► P99 < 100ms
│ └─► NETWORK LATENCY
│ - Check geographic distance client ↔ server
│ - Measure baseline latency (LatencyBenchmark)
│ - Reduce round trips (batch operations)
│
└─► Profile Individual Calls (use profile_rpyc_calls)
│
├─► stats['total_calls'] > 100?
│ └─► EXCESSIVE ROUND TRIPS
│ Actions:
│ - Batch multiple calls into one
│ - Return all needed data in single call
│ - See "Reduce Round Trips by Batching" pattern
│
├─► stats['netrefs_created'] > 20?
│ └─► NETREF OVERHEAD
│ Actions:
│ - Return dicts instead of objects
│ - Use rpyc.utils.classic.obtain() to copy objects
│ - See "Eliminate NetRef Overhead" pattern
│
└─► stats['max_stack_depth'] > 3?
└─► DEEP CALL NESTING
Actions:
- Flatten remote call structure
- Move nested logic server-side
- See "Flatten Deep Call Stacks" pattern
Quick Reference Table:
| Symptom | Root Cause | Action |
|---|---|---|
| Success rate < 95% | Server exhaustion | Increase ulimits, reduce load |
| CPU > 80% + P99/P50 > 5 | GIL contention | Switch to forking mode |
| CPU < 30% + P99 > 100ms | I/O bottleneck | Optimize I/O, connection pooling |
| P99/P50 > 10 | Resource contention | Check CPU, try forking, scale |
| total_calls > 100 | Excessive round trips | Batch calls, return all data at once |
| netrefs_created > 20 | NetRef overhead | Return primitives, use obtain() |
| max_stack_depth > 3 | Deep call nesting | Flatten structure, server-side logic |
Problem: A specific RPyC method takes several seconds but you don't know why.
Solution:
import rpyc
from rpycbench.utils.profiler import profile_rpyc_calls, RPyCTelemetry
from rpycbench.utils.visualizer import (
format_call_tree,
format_slow_calls_report,
format_netref_report
)
# Custom telemetry to catch moderately slow calls
telemetry = RPyCTelemetry(
slow_call_threshold=0.05, # Flag calls > 50ms
deep_stack_threshold=3, # Flag stacks > 3 deep
track_netrefs=True,
track_stacks=True
)
conn = rpyc.connect('localhost', 18812)
with profile_rpyc_calls(conn, telemetry_inst=telemetry) as profiled:
result = profiled.root.process_large_dataset(dataset_id)
# Print comprehensive diagnosis
print("\n=== SLOW CALLS ===")
print(format_slow_calls_report(telemetry, top_n=20))
print("\n=== CALL TREE ===")
print(format_call_tree(telemetry, show_netrefs=True))
print("\n=== NETREF REPORT ===")
print(format_netref_report(telemetry))
# Get statistics
stats = telemetry.get_statistics()
print(f"\nDiagnosis Summary:")
print(f" Total calls: {stats['total_calls']}")
print(f" Network round trips: {stats['total_calls']}")
print(f" Slow calls: {stats['slow_calls']}")
print(f" NetRefs created: {stats['netrefs_created']}")
print(f" Max stack depth: {stats['max_stack_depth']}")
# Identify the problem
if stats['total_calls'] > 100:
print("\n PROBLEM: Excessive round trips - consider batching calls")
if stats['netrefs_created'] > 20:
print(" PROBLEM: NetRef overhead - return primitives instead of objects")
if stats['max_stack_depth'] > 3:
print(" PROBLEM: Deep call nesting - flatten structure or move logic server-side")Expected output:
=== SLOW CALLS ===
Top 20 Slow Calls (threshold: 50.0ms)
1. fetch_record: 234.5ms (called 156 times, 36.6s total)
2. validate_data: 87.3ms (called 156 times, 13.6s total)
3. calculate_metrics: 45.2ms (called 1 time, 45.2ms total)
PROBLEM: Excessive round trips - consider batching calls
Problem: Your operation is making hundreds of network calls when it should make one or two.
Solution:
import rpyc
from rpycbench.utils.profiler import profile_rpyc_calls
conn = rpyc.connect('localhost', 18812)
with profile_rpyc_calls(conn) as profiled:
# Your operation
users = profiled.root.get_users()
for user in users:
# ANTI-PATTERN: Each iteration makes a network call
email = user.email # Network call
name = user.name # Network call
status = user.status # Network call
stats = profiled.telemetry.get_statistics()
print(f"Total round trips: {stats['total_calls']}")
# If you see 300+ calls for 100 users, you have the problem
# Fix: Return primitive data insteadFix:
# Server-side: Return dict instead of objects
class MyService(rpyc.Service):
def exposed_get_users(self):
users = fetch_users_from_db()
# Return primitive data, not objects
return [
{'email': u.email, 'name': u.name, 'status': u.status}
for u in users
]
# Client-side: Now this is just 1 network call
conn = rpyc.connect('localhost', 18812)
users = conn.root.get_users() # Single call returns all data
for user in users:
email = user['email'] # Local dict access, no network
name = user['name'] # Local dict access, no network
status = user['status'] # Local dict access, no networkProblem: You're not sure if NetRefs are causing performance issues.
Solution:
import rpyc
from rpycbench.utils.profiler import profile_rpyc_calls
from rpycbench.utils.visualizer import format_netref_report
telemetry = RPyCTelemetry(track_netrefs=True)
conn = rpyc.connect('localhost', 18812)
with profile_rpyc_calls(conn, telemetry_inst=telemetry) as profiled:
result = profiled.root.your_operation()
print(format_netref_report(telemetry))
stats = telemetry.get_statistics()
if stats['netrefs_created'] > 0:
print(f"\nWARNING: {stats['netrefs_created']} NetRefs created")
print("Each NetRef access causes a network round trip")
print("Consider using rpyc.utils.classic.obtain() to copy objects locally")Fix options:
import rpyc
from rpyc.utils.classic import obtain
conn = rpyc.connect('localhost', 18812)
# Option 1: Use obtain() to copy the entire object tree locally
user = obtain(conn.root.get_user(123))
# Now user is a local object, all attribute access is local
email = user.email # No network call
name = user.name # No network call
# Option 2: Return primitive data from server (preferred)
user_dict = conn.root.get_user_dict(123) # Server returns dict
email = user_dict['email'] # Local access
name = user_dict['name'] # Local accessProblem: Remote methods are calling other remote methods, creating latency multiplication.
Solution:
import rpyc
from rpycbench.utils.profiler import profile_rpyc_calls
from rpycbench.utils.visualizer import format_call_tree
telemetry = RPyCTelemetry(
track_stacks=True,
deep_stack_threshold=3 # Warn when depth > 3
)
conn = rpyc.connect('localhost', 18812)
with profile_rpyc_calls(conn, telemetry_inst=telemetry) as profiled:
result = profiled.root.complex_operation()
# Visualize call nesting
print(format_call_tree(telemetry, max_depth=None))
stats = telemetry.get_statistics()
if stats['max_stack_depth'] > 3:
print(f"\nWARNING: Max call depth of {stats['max_stack_depth']}")
print("Each level adds network latency - consider flattening")Fix:
# BEFORE: Nested remote calls (4x network latency)
class MyService(rpyc.Service):
def exposed_method_a(self):
return self.exposed_method_b() # Remote call
def exposed_method_b(self):
return self.exposed_method_c() # Remote call
def exposed_method_c(self):
return self.exposed_method_d() # Remote call
def exposed_method_d(self):
return "result"
# AFTER: Flat call (1x network latency)
class MyService(rpyc.Service):
def exposed_method_a(self):
# Do all the work in one method
result_b = self._internal_method_b() # Local call
result_c = self._internal_method_c(result_b) # Local call
result_d = self._internal_method_d(result_c) # Local call
return result_d
def _internal_method_b(self):
return "step_b"
def _internal_method_c(self, input_b):
return "step_c"
def _internal_method_d(self, input_c):
return "result"Problem: You made changes but want to verify they actually improved performance.
Solution:
from rpycbench.core.benchmark import LatencyBenchmark
from rpycbench.core.metrics import BenchmarkResults
import rpyc
def test_old_implementation(conn):
return conn.root.old_method()
def test_new_implementation(conn):
return conn.root.new_method()
# Benchmark old implementation
old_bench = LatencyBenchmark(
name="old_implementation",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=test_old_implementation,
num_requests=1000
)
# Benchmark new implementation
new_bench = LatencyBenchmark(
name="new_implementation",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=test_new_implementation,
num_requests=1000
)
old_metrics = old_bench.run()
new_metrics = new_bench.run()
# Compare results
results = BenchmarkResults()
results.add_result(old_metrics)
results.add_result(new_metrics)
results.print_summary()
# Calculate improvement
old_stats = old_metrics.compute_statistics()
new_stats = new_metrics.compute_statistics()
improvement = ((old_stats['latency']['mean'] - new_stats['latency']['mean'])
/ old_stats['latency']['mean'] * 100)
print(f"\nPerformance improvement: {improvement:.1f}%")
print(f"Old P99: {old_stats['latency']['p99']*1000:.2f}ms")
print(f"New P99: {new_stats['latency']['p99']*1000:.2f}ms")Problem: Need to know how long it takes to establish RPyC connections.
Solution:
from rpycbench.core.benchmark import ConnectionBenchmark
import rpyc
bench = ConnectionBenchmark(
name="connection_time",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812, config={
'allow_public_attrs': True,
'sync_request_timeout': 30
}),
num_connections=100
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Connection time statistics:")
print(f" Mean: {stats['connection_time']['mean']*1000:.2f}ms")
print(f" Median: {stats['connection_time']['median']*1000:.2f}ms")
print(f" P95: {stats['connection_time']['p95']*1000:.2f}ms")
print(f" P99: {stats['connection_time']['p99']*1000:.2f}ms")
print(f" Min: {stats['connection_time']['min']*1000:.2f}ms")
print(f" Max: {stats['connection_time']['max']*1000:.2f}ms")
# If P99 > 100ms, consider connection pooling
if stats['connection_time']['p99'] > 0.1:
print("\nRECOMMENDATION: Connection time is high, use connection pooling")Problem: Need to understand the latency distribution of your RPyC calls, especially tail latency.
Solution:
from rpycbench.core.benchmark import LatencyBenchmark
import rpyc
def my_operation(conn):
return conn.root.fetch_data(query_id=123)
bench = LatencyBenchmark(
name="data_fetch",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=my_operation,
num_requests=10000, # Large sample for accurate distribution
warmup_requests=100 # Exclude warmup from stats
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Latency distribution for {metrics.total_requests} requests:")
print(f" Mean: {stats['latency']['mean']*1000:.2f}ms")
print(f" Median (P50): {stats['latency']['median']*1000:.2f}ms")
print(f" P95: {stats['latency']['p95']*1000:.2f}ms")
print(f" P99: {stats['latency']['p99']*1000:.2f}ms")
print(f" Max: {stats['latency']['max']*1000:.2f}ms")
print(f" Std Dev: {stats['latency']['stdev']*1000:.2f}ms")
# Check for tail latency issues
if stats['latency']['p99'] > stats['latency']['median'] * 5:
print("\nWARNING: High tail latency (P99 >> P50)")
print("This indicates inconsistent performance - investigate outliers")Problem: Need to understand how payload size affects transfer performance.
Solution:
from rpycbench.core.benchmark import BandwidthBenchmark
import rpyc
# Define data sizes to test
data_sizes = [
1024, # 1 KB
10 * 1024, # 10 KB
100 * 1024, # 100 KB
1024 * 1024, # 1 MB
10 * 1024 * 1024 # 10 MB
]
bench = BandwidthBenchmark(
name="payload_size_test",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
upload_func=lambda c, data: c.root.receive_data(data),
download_func=lambda c, size: c.root.send_data(size),
data_sizes=data_sizes,
iterations=20
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"Upload bandwidth:")
print(f" Mean: {stats['upload_bandwidth']['mean']/1024/1024:.2f} MB/s")
print(f" P95: {stats['upload_bandwidth']['p95']/1024/1024:.2f} MB/s")
print(f"\nDownload bandwidth:")
print(f" Mean: {stats['download_bandwidth']['mean']/1024/1024:.2f} MB/s")
print(f" P95: {stats['download_bandwidth']['p95']/1024/1024:.2f} MB/s")
# Export detailed results for analysis
import json
with open('bandwidth_results.json', 'w') as f:
json.dump(metrics.to_dict(), f, indent=2)Problem: Need to test how your RPyC server handles many simultaneous clients.
Solution:
from rpycbench.core.benchmark import ConcurrentBenchmark
import rpyc
def client_workload(conn):
# Simulate realistic client workload
conn.root.authenticate(user_id=123)
data = conn.root.fetch_user_data()
conn.root.update_status("active")
return data
# Test increasing concurrency levels
for num_clients in [10, 50, 100, 200, 500]:
bench = ConcurrentBenchmark(
name=f"{num_clients}_clients",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=client_workload,
num_clients=num_clients,
requests_per_client=50,
track_per_connection=False # Set True to debug individual connections
)
metrics = bench.run()
stats = metrics.compute_statistics()
success_rate = (1 - metrics.failed_requests/metrics.total_requests) * 100
print(f"\n{num_clients} concurrent clients:")
print(f" Success rate: {success_rate:.1f}%")
print(f" Mean latency: {stats['latency']['mean']*1000:.2f}ms")
print(f" P99 latency: {stats['latency']['p99']*1000:.2f}ms")
print(f" Total requests: {metrics.total_requests}")
print(f" Failed requests: {metrics.failed_requests}")
# Stop if performance degrades significantly
if success_rate < 95 or stats['latency']['p99'] > 5.0:
print(f"\n LIMIT REACHED at {num_clients} clients")
breakProblem: Need to optimize large file transfers and find the best chunk size.
Solution:
from rpycbench.core.benchmark import BinaryTransferBenchmark
import rpyc
# Test different chunk sizes
for chunk_size in [16*1024, 64*1024, 256*1024, 1024*1024]:
bench = BinaryTransferBenchmark(
name=f"chunk_{chunk_size//1024}kb",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
upload_func=lambda c, data: c.root.upload_file(data),
download_func=lambda c, size: c.root.download_file(size),
upload_chunked_func=lambda c, chunks: c.root.upload_chunked(chunks),
download_chunked_func=lambda c, size, cs: c.root.download_chunked(size, cs),
file_sizes=[10*1024*1024, 100*1024*1024], # 10MB, 100MB
chunk_size=chunk_size,
iterations=5,
test_upload=True,
test_download=True,
test_chunked=True
)
metrics = bench.run()
stats = metrics.compute_statistics()
print(f"\nChunk size: {chunk_size//1024}KB")
print(f" Upload: {stats['upload_bandwidth']['mean']/1024/1024:.2f} MB/s")
print(f" Download: {stats['download_bandwidth']['mean']/1024/1024:.2f} MB/s")Problem: Want to monitor RPyC performance in production without significant overhead.
Solution:
import rpyc
from rpycbench.utils.profiler import RPyCTelemetry, ProfiledConnection
import logging
# Global telemetry instance with conservative settings
telemetry = RPyCTelemetry(
enabled=True,
track_netrefs=False, # Disable for lower overhead
track_stacks=False, # Disable for lower overhead
slow_call_threshold=1.0 # Only track calls > 1 second
)
def create_monitored_connection(host, port):
"""Connection factory with built-in monitoring"""
base_conn = rpyc.connect(host, port)
return ProfiledConnection(
base_conn,
telemetry_inst=telemetry,
auto_print_on_slow=True, # Log slow calls automatically
auto_print_on_deep=False
)
# Use in application
conn = create_monitored_connection('localhost', 18812)
try:
result = conn.root.important_operation()
finally:
# Periodic reporting (e.g., every 1000 requests)
stats = telemetry.get_statistics()
if stats['total_calls'] % 1000 == 0:
logging.info(f"RPyC stats: {stats['total_calls']} calls, "
f"{stats['slow_calls']} slow calls")
conn.close()Problem: Want to measure specific operations in your application code.
Solution:
from rpycbench.core.benchmark import BenchmarkContext
import rpyc
conn = rpyc.connect('localhost', 18812)
# Create context for each logical operation
user_ctx = BenchmarkContext(
name="user_operations",
protocol="rpyc",
measure_latency=True,
measure_system=True
)
order_ctx = BenchmarkContext(
name="order_operations",
protocol="rpyc",
measure_latency=True,
measure_system=True
)
# Track user operations
with user_ctx.measure_request(bytes_sent=512, bytes_received=4096):
user = conn.root.get_user(user_id=123)
user_ctx.record_request(success=True)
# Track order operations
with order_ctx.measure_request(bytes_sent=2048, bytes_received=8192):
try:
order = conn.root.create_order(user_id=123, items=[...])
order_ctx.record_request(success=True)
except Exception:
order_ctx.record_request(success=False)
# Analyze results
user_stats = user_ctx.get_results().compute_statistics()
order_stats = order_ctx.get_results().compute_statistics()
print(f"User operations: {user_stats['latency']['mean']*1000:.2f}ms avg")
print(f"Order operations: {order_stats['latency']['mean']*1000:.2f}ms avg")Problem: Want to collect and visualize RPyC performance metrics over time.
Solution:
from rpycbench.core.benchmark import BenchmarkContext
from rpycbench.core.metrics import BenchmarkMetrics
import json
import time
from datetime import datetime
class PerformanceMonitor:
def __init__(self, output_file='metrics.jsonl'):
self.output_file = output_file
self.contexts = {}
def get_context(self, operation_name):
if operation_name not in self.contexts:
self.contexts[operation_name] = BenchmarkContext(
name=operation_name,
protocol="rpyc",
measure_latency=True,
measure_system=True
)
return self.contexts[operation_name]
def track_operation(self, operation_name, func, *args, **kwargs):
ctx = self.get_context(operation_name)
start_time = time.time()
try:
with ctx.measure_request(bytes_sent=0, bytes_received=0):
result = func(*args, **kwargs)
ctx.record_request(success=True)
return result
except Exception as e:
ctx.record_request(success=False)
raise
def flush_metrics(self):
"""Write metrics to file in JSON Lines format"""
timestamp = datetime.utcnow().isoformat()
with open(self.output_file, 'a') as f:
for name, ctx in self.contexts.items():
metrics = ctx.get_results()
stats = metrics.compute_statistics()
record = {
'timestamp': timestamp,
'operation': name,
'stats': stats,
'total_requests': metrics.total_requests,
'failed_requests': metrics.failed_requests
}
f.write(json.dumps(record) + '\n')
# Usage
monitor = PerformanceMonitor()
conn = rpyc.connect('localhost', 18812)
# Automatic tracking
result = monitor.track_operation('get_user', conn.root.get_user, 123)
result = monitor.track_operation('create_order', conn.root.create_order, {...})
# Periodic flushing (e.g., every minute)
import threading
def periodic_flush():
monitor.flush_metrics()
threading.Timer(60, periodic_flush).start()
periodic_flush()Problem: Want to be notified when RPyC performance degrades.
Solution:
from rpycbench.core.benchmark import LatencyBenchmark
import rpyc
import smtplib
from email.message import EmailMessage
class PerformanceWatchdog:
def __init__(self, baseline_p99_ms=50):
self.baseline_p99 = baseline_p99_ms / 1000 # Convert to seconds
self.alert_threshold = self.baseline_p99 * 2 # Alert if 2x baseline
def check_performance(self):
bench = LatencyBenchmark(
name="health_check",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=lambda c: c.root.health_check(),
num_requests=100
)
metrics = bench.run()
stats = metrics.compute_statistics()
current_p99 = stats['latency']['p99']
if current_p99 > self.alert_threshold:
self.send_alert(
f"RPyC performance degraded: "
f"P99 latency {current_p99*1000:.2f}ms "
f"(baseline: {self.baseline_p99*1000:.2f}ms)"
)
return False
return True
def send_alert(self, message):
# Send email, page, or log alert
print(f"ALERT: {message}")
# Implement your alerting mechanism here
# Run periodically
watchdog = PerformanceWatchdog(baseline_p99_ms=50)
import schedule
schedule.every(5).minutes.do(watchdog.check_performance)
while True:
schedule.run_pending()
time.sleep(60)Problem: Making many small RPyC calls instead of batching them.
Before:
# SLOW: 100 network round trips
conn = rpyc.connect('localhost', 18812)
for user_id in range(100):
user = conn.root.get_user(user_id)
print(user)After:
# FAST: 1 network round trip
conn = rpyc.connect('localhost', 18812)
users = conn.root.get_users_batch(list(range(100)))
for user in users:
print(user)Server implementation:
class MyService(rpyc.Service):
def exposed_get_users_batch(self, user_ids):
# Fetch all users in one database query
users = db.query(User).filter(User.id.in_(user_ids)).all()
# Return primitive data, not objects
return [
{'id': u.id, 'name': u.name, 'email': u.email}
for u in users
]Verify improvement:
from rpycbench.utils.profiler import profile_rpyc_calls
# Measure before
with profile_rpyc_calls(conn) as profiled:
for user_id in range(100):
user = profiled.root.get_user(user_id)
before_stats = profiled.telemetry.get_statistics()
print(f"Before: {before_stats['total_calls']} round trips")
# Measure after
profiled.telemetry.reset()
with profile_rpyc_calls(conn) as profiled:
users = profiled.root.get_users_batch(list(range(100)))
after_stats = profiled.telemetry.get_statistics()
print(f"After: {after_stats['total_calls']} round trips")
print(f"Reduction: {(1 - after_stats['total_calls']/before_stats['total_calls'])*100:.0f}%")Problem: Returning objects creates NetRefs which cause network overhead on every attribute access.
Before:
# Server
class User:
def __init__(self, id, name, email):
self.id = id
self.name = name
self.email = email
class MyService(rpyc.Service):
def exposed_get_user(self, user_id):
return User(user_id, "John", "john@example.com") # Returns object
# Client (SLOW)
conn = rpyc.connect('localhost', 18812)
user = conn.root.get_user(123) # 1 network call
name = user.name # NETWORK CALL!
email = user.email # NETWORK CALL!After:
# Server
class MyService(rpyc.Service):
def exposed_get_user(self, user_id):
user = fetch_user_from_db(user_id)
# Return dict, not object
return {
'id': user.id,
'name': user.name,
'email': user.email
}
# Client (FAST)
conn = rpyc.connect('localhost', 18812)
user = conn.root.get_user(123) # 1 network call
name = user['name'] # Local dict access
email = user['email'] # Local dict accessAlternative using obtain():
from rpyc.utils.classic import obtain
# If you can't change the server
conn = rpyc.connect('localhost', 18812)
user = obtain(conn.root.get_user(123)) # Copies entire object tree locally
name = user.name # Local access
email = user.email # Local accessProblem: Remote methods calling other remote methods creates latency multiplication.
Before:
# Server with nested remote calls
class MyService(rpyc.Service):
def exposed_get_user_report(self, user_id):
user = self.exposed_get_user(user_id) # Call 1
orders = self.exposed_get_orders(user_id) # Call 2
metrics = self.exposed_calculate_metrics(user, orders) # Call 3
return metrics
def exposed_get_user(self, user_id):
return db.query(User).get(user_id)
def exposed_get_orders(self, user_id):
return db.query(Order).filter_by(user_id=user_id).all()
def exposed_calculate_metrics(self, user, orders):
return {'total': len(orders), 'user': user.name}
# Client (SLOW: 4 round trips for one operation)
conn = rpyc.connect('localhost', 18812)
report = conn.root.get_user_report(123)After:
# Server with flat structure
class MyService(rpyc.Service):
def exposed_get_user_report(self, user_id):
# Do all work server-side with internal methods
user = self._get_user(user_id) # Local call
orders = self._get_orders(user_id) # Local call
metrics = self._calculate_metrics(user, orders) # Local call
return metrics
def _get_user(self, user_id):
return db.query(User).get(user_id)
def _get_orders(self, user_id):
return db.query(Order).filter_by(user_id=user_id).all()
def _calculate_metrics(self, user, orders):
return {'total': len(orders), 'user': user.name}
# Client (FAST: 1 round trip)
conn = rpyc.connect('localhost', 18812)
report = conn.root.get_user_report(123)Problem: Default chunk sizes may not be optimal for your network conditions.
Solution: Benchmark different chunk sizes to find the optimum.
from rpycbench.core.benchmark import BinaryTransferBenchmark
import rpyc
chunk_sizes = [
8 * 1024, # 8 KB
16 * 1024, # 16 KB
64 * 1024, # 64 KB (RPyC default)
256 * 1024, # 256 KB
1024 * 1024, # 1 MB
]
results = []
for chunk_size in chunk_sizes:
bench = BinaryTransferBenchmark(
name=f"chunk_{chunk_size//1024}kb",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
upload_func=lambda c, data: c.root.upload(data),
download_func=lambda c, size: c.root.download(size),
file_sizes=[100 * 1024 * 1024], # 100 MB
chunk_size=chunk_size,
iterations=3
)
metrics = bench.run()
stats = metrics.compute_statistics()
throughput = stats['upload_bandwidth']['mean'] / 1024 / 1024
results.append((chunk_size // 1024, throughput))
print(f"Chunk {chunk_size//1024}KB: {throughput:.2f} MB/s")
# Find optimal
optimal = max(results, key=lambda x: x[1])
print(f"\nOptimal chunk size: {optimal[0]}KB ({optimal[1]:.2f} MB/s)")Problem: Many parallel clients are slow, but you don't know if it's CPU saturation, I/O wait, or network issues.
Solution: Create test workloads that isolate CPU-bound vs I/O-bound behavior, then benchmark under concurrent load.
import hashlib
import time
from rpycbench.core.benchmark import ConcurrentBenchmark
from rpycbench.servers.rpyc_servers import RPyCServer
import rpyc
# Server implementations
class CPUBoundService(rpyc.Service):
def exposed_cpu_work(self, iterations=100000):
"""CPU-intensive: hash computation loop"""
result = b"start"
for i in range(iterations):
result = hashlib.sha256(result).digest()
return result.hex()[:16]
class IOBoundService(rpyc.Service):
def exposed_io_work(self, delay=0.01):
"""I/O-bound: simulates database query or file I/O"""
time.sleep(delay) # Simulates I/O wait
return "data"
# Test CPU-bound workload with THREADED mode
print("=== CPU-Bound Workload ===")
with RPyCServer(port=18812, mode='threaded') as server:
cpu_threaded = ConcurrentBenchmark(
name="cpu_threaded",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=lambda c: c.root.cpu_work(),
num_clients=128,
requests_per_client=10
)
cpu_threaded_metrics = cpu_threaded.run()
cpu_threaded_metrics.record_system_metrics() # Capture CPU/memory
cpu_threaded_stats = cpu_threaded_metrics.compute_statistics()
# Test CPU-bound workload with FORKING mode
with RPyCServer(port=18813, mode='forking') as server:
cpu_forking = ConcurrentBenchmark(
name="cpu_forking",
protocol="rpyc",
server_mode="forking",
connection_factory=lambda: rpyc.connect('localhost', 18813),
request_func=lambda c: c.root.cpu_work(),
num_clients=128,
requests_per_client=10
)
cpu_forking_metrics = cpu_forking.run()
cpu_forking_metrics.record_system_metrics()
cpu_forking_stats = cpu_forking_metrics.compute_statistics()
# Test I/O-bound workload with THREADED mode
print("\n=== I/O-Bound Workload ===")
with RPyCServer(port=18814, mode='threaded') as server:
io_threaded = ConcurrentBenchmark(
name="io_threaded",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18814),
request_func=lambda c: c.root.io_work(),
num_clients=128,
requests_per_client=10
)
io_threaded_metrics = io_threaded.run()
io_threaded_metrics.record_system_metrics()
io_threaded_stats = io_threaded_metrics.compute_statistics()
# Test I/O-bound workload with FORKING mode
with RPyCServer(port=18815, mode='forking') as server:
io_forking = ConcurrentBenchmark(
name="io_forking",
protocol="rpyc",
server_mode="forking",
connection_factory=lambda: rpyc.connect('localhost', 18815),
request_func=lambda c: c.root.io_work(),
num_clients=128,
requests_per_client=10
)
io_forking_metrics = io_forking.run()
io_forking_metrics.record_system_metrics()
io_forking_stats = io_forking_metrics.compute_statistics()
# Compare results
print("\nCPU-Bound Results:")
print(f" Threaded: {cpu_threaded_stats['latency']['mean']*1000:.2f}ms avg, "
f"CPU: {cpu_threaded_stats['cpu_usage']['mean']:.1f}%")
print(f" Forking: {cpu_forking_stats['latency']['mean']*1000:.2f}ms avg, "
f"CPU: {cpu_forking_stats['cpu_usage']['mean']:.1f}%")
print(f" Improvement: {(1 - cpu_forking_stats['latency']['mean']/cpu_threaded_stats['latency']['mean'])*100:.1f}%")
print("\nI/O-Bound Results:")
print(f" Threaded: {io_threaded_stats['latency']['mean']*1000:.2f}ms avg, "
f"CPU: {io_threaded_stats['cpu_usage']['mean']:.1f}%")
print(f" Forking: {io_forking_stats['latency']['mean']*1000:.2f}ms avg, "
f"CPU: {io_forking_stats['cpu_usage']['mean']:.1f}%")
print(f" Difference: {((io_forking_stats['latency']['mean']/io_threaded_stats['latency']['mean'])-1)*100:.1f}%")Interpretation Guide:
| Symptom | Diagnosis | Fix |
|---|---|---|
| CPU-bound threaded: High single-core CPU (80-100%), poor scaling | GIL contention - Python's Global Interpreter Lock prevents parallel execution | Switch to forking mode |
| CPU-bound forking: Multi-core CPU usage, much faster than threaded | Proper parallelization, no GIL | Keep forking mode |
| I/O-bound: Low CPU usage (< 30%), high latency | I/O wait time dominates | Threaded is fine, optimize I/O operations |
| I/O-bound: Similar performance in both modes | I/O wait, not CPU, is bottleneck | Use threaded (lower memory overhead) |
| High P99 >> P50 in threaded mode | Resource contention / GIL queuing | Try forking or reduce concurrency |
Key Insight:
- CPU-bound + threaded = GIL wall: Only one thread executes Python code at a time, others wait
- I/O-bound + threaded = good: During I/O wait, GIL is released, other threads can run
- Forking bypasses GIL: Each process has its own Python interpreter, but costs more memory
Problem: Poor concurrent performance, but unclear if the server or clients are the bottleneck.
Solution: Use per-connection tracking and analyze patterns.
from rpycbench.core.benchmark import ConcurrentBenchmark
import rpyc
# Enable per-connection tracking
bench = ConcurrentBenchmark(
name="bottleneck_diagnosis",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=lambda c: c.root.some_method(),
num_clients=128,
requests_per_client=100,
track_per_connection=True # Track individual client performance
)
metrics = bench.run()
stats = metrics.compute_statistics()
# Analyze patterns
success_rate = (1 - metrics.failed_requests / metrics.total_requests) * 100
p50 = stats['latency']['median']
p99 = stats['latency']['p99']
ratio = p99 / p50 if p50 > 0 else 0
print(f"Overall Metrics:")
print(f" Success rate: {success_rate:.1f}%")
print(f" P50 latency: {p50*1000:.2f}ms")
print(f" P99 latency: {p99*1000:.2f}ms")
print(f" P99/P50 ratio: {ratio:.1f}x")
print(f" CPU usage: {stats['cpu_usage']['mean']:.1f}%")
print(f" Memory usage: {stats['memory_usage']['mean']/1024/1024:.0f}MB")
# Diagnosis
print("\nDiagnosis:")
if success_rate < 95:
print(" SERVER EXHAUSTION: < 95% success rate indicates server is dropping connections")
print(" - Check server logs for 'too many open files' or connection errors")
print(" - Increase server ulimits: ulimit -n 65536")
print(" - Reduce num_clients or switch to connection pooling")
if ratio > 10:
print(" RESOURCE CONTENTION: P99 >> P50 indicates inconsistent performance")
print(" - If CPU-bound: Switch to forking mode to avoid GIL contention")
print(" - If I/O-bound: Server may be hitting I/O limits (file descriptors, database connections)")
print(" - Check server-side resource usage separately")
if stats['cpu_usage']['mean'] > 80:
print(" CPU SATURATION: High CPU usage indicates compute bottleneck")
if ratio > 5:
print(" - Likely GIL contention in threaded mode")
print(" - Try forking mode or optimize CPU-intensive operations")
else:
print(" - Server handling load but at capacity")
print(" - Scale horizontally or optimize hot code paths")
if stats['cpu_usage']['mean'] < 30 and p99 > 0.1:
print(" I/O BOTTLENECK: Low CPU + high latency = waiting on I/O")
print(" - Profile database queries or external API calls")
print(" - Consider async I/O or connection pooling")
print(" - Check network latency between server and backend services")
# Per-connection analysis (if track_per_connection=True)
if hasattr(metrics, 'per_connection_latencies'):
import statistics
conn_means = [statistics.mean(latencies) for latencies in metrics.per_connection_latencies if latencies]
conn_variance = statistics.stdev(conn_means) if len(conn_means) > 1 else 0
print(f"\nPer-Connection Variance: {conn_variance*1000:.2f}ms")
if conn_variance > p50 * 0.5:
print(" CLIENT/NETWORK ISSUE: High variance between connections")
print(" - Some clients experiencing much worse performance than others")
print(" - Check client-side resource limits or network path diversity")
else:
print(" SERVER BOTTLENECK: All clients affected equally")
print(" - Server-side issue affecting all connections uniformly")Pattern Recognition:
| Pattern | Root Cause | Action |
|---|---|---|
| All clients slow uniformly, P99/P50 < 3 | Server at steady capacity | Scale server or optimize operations |
| Random clients slow, high variance | Client or network issues | Investigate client-side resources, network paths |
| Increasing slowdown with more clients | Server resource exhaustion | Check CPU, memory, file descriptors; reduce concurrency |
| < 95% success rate | Connection/port exhaustion | Increase ulimits, use connection pooling |
| High CPU + high P99/P50 | GIL contention (threaded mode) | Switch to forking for CPU-bound work |
| Low CPU + high latency | I/O wait (database, disk, network) | Optimize I/O operations, consider async |
Problem: Threaded server mode may not be optimal for your workload.
Background: Understanding the Python GIL:
Python's Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecode simultaneously. This has major implications for concurrent RPyC servers:
-
Threaded mode: All client connections share one Python interpreter with one GIL
- CPU-bound work: Only one thread executes at a time → poor scaling, high latency
- I/O-bound work: During I/O (network, disk, database), GIL is released → good scaling
- Memory efficient: Threads share memory space
-
Forking mode: Each client connection gets its own process with its own GIL
- CPU-bound work: True parallelism, utilizes all CPU cores → excellent scaling
- I/O-bound work: No better than threaded (I/O still waits) → memory overhead not worth it
- Memory intensive: Each process duplicates memory
Solution: Benchmark different server modes and interpret CPU usage patterns.
from rpycbench.core.benchmark import ConcurrentBenchmark
import rpyc
def test_workload(conn):
return conn.root.cpu_intensive_task()
# Test threaded mode
threaded_bench = ConcurrentBenchmark(
name="threaded",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=test_workload,
num_clients=100,
requests_per_client=50
)
# Test forking mode
forking_bench = ConcurrentBenchmark(
name="forking",
protocol="rpyc",
server_mode="forking",
connection_factory=lambda: rpyc.connect('localhost', 18813), # Different port
request_func=test_workload,
num_clients=100,
requests_per_client=50
)
threaded_metrics = threaded_bench.run()
forking_metrics = forking_bench.run()
threaded_stats = threaded_metrics.compute_statistics()
forking_stats = forking_metrics.compute_statistics()
print(f"Threaded mode P99: {threaded_stats['latency']['p99']*1000:.2f}ms")
print(f"Forking mode P99: {forking_stats['latency']['p99']*1000:.2f}ms")
# Recommendations:
# - Threaded: Good for I/O bound tasks, lower memory
# - Forking: Good for CPU bound tasks, isolated memory, higher overheadProblem: Want to ensure performance doesn't degrade in CI/CD pipeline.
Solution:
from rpycbench.core.benchmark import LatencyBenchmark
import rpyc
import sys
def performance_test():
bench = LatencyBenchmark(
name="regression_test",
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=lambda c: c.root.critical_operation(),
num_requests=1000
)
metrics = bench.run()
stats = metrics.compute_statistics()
# Define performance SLA
MAX_P99_MS = 100
MAX_MEAN_MS = 50
p99_ms = stats['latency']['p99'] * 1000
mean_ms = stats['latency']['mean'] * 1000
print(f"P99 latency: {p99_ms:.2f}ms (threshold: {MAX_P99_MS}ms)")
print(f"Mean latency: {mean_ms:.2f}ms (threshold: {MAX_MEAN_MS}ms)")
if p99_ms > MAX_P99_MS:
print(f"FAIL: P99 latency {p99_ms:.2f}ms exceeds {MAX_P99_MS}ms")
sys.exit(1)
if mean_ms > MAX_MEAN_MS:
print(f"FAIL: Mean latency {mean_ms:.2f}ms exceeds {MAX_MEAN_MS}ms")
sys.exit(1)
print("PASS: Performance within acceptable limits")
return 0
if __name__ == '__main__':
sys.exit(performance_test())In CI/CD (e.g., GitHub Actions):
- name: Run performance regression tests
run: |
python start_test_server.py &
sleep 2
python performance_regression.pyProblem: Evaluating whether RPyC is the right choice for your application.
Solution:
from rpycbench.benchmarks.suite import BenchmarkSuite
suite = BenchmarkSuite(
rpyc_host='localhost',
rpyc_port=18812,
http_host='localhost',
http_port=5000
)
results = suite.run_all(
test_rpyc_threaded=True,
test_rpyc_forking=False,
test_http=True,
num_requests=1000,
num_parallel_clients=100
)
results.print_summary()
# Analyze results
comparison = results.get_comparison_table()
rpyc_latency = comparison['rpyc_threaded']['latency_mean']
http_latency = comparison['http']['latency_mean']
if rpyc_latency < http_latency:
print(f"\nRPyC is {(http_latency/rpyc_latency - 1)*100:.1f}% faster than HTTP")
else:
print(f"\nHTTP is {(rpyc_latency/http_latency - 1)*100:.1f}% faster than RPyC")
print("Consider investigating NetRef overhead and call patterns")Problem: Need to test performance over real network conditions with remote servers.
Solution:
from rpycbench.benchmarks.suite import BenchmarkSuite
# Test with remote server via SSH
suite = BenchmarkSuite(
rpyc_host='remote-server.example.com',
rpyc_port=18812,
remote_host='user@remote-server.example.com' # Enables SSH deployment
)
# Automatically deploys rpycbench to remote host and starts server
results = suite.run_all(
test_rpyc_threaded=True,
num_requests=1000
)
results.print_summary()
# Compare local vs remote performance
local_suite = BenchmarkSuite(rpyc_host='localhost', rpyc_port=18812)
local_results = local_suite.run_all(test_rpyc_threaded=True, num_requests=1000)
local_latency = local_results.results[0].compute_statistics()['latency']['mean']
remote_latency = results.results[0].compute_statistics()['latency']['mean']
network_overhead = (remote_latency - local_latency) * 1000
print(f"\nNetwork overhead: {network_overhead:.2f}ms")Problem: Made optimizations but need to prove they work with statistical significance.
Solution:
from rpycbench.core.benchmark import LatencyBenchmark
from rpycbench.core.metrics import BenchmarkResults
import rpyc
import scipy.stats
def benchmark_implementation(name, method_name, num_requests=1000):
bench = LatencyBenchmark(
name=name,
protocol="rpyc",
server_mode="threaded",
connection_factory=lambda: rpyc.connect('localhost', 18812),
request_func=lambda c: getattr(c.root, method_name)(),
num_requests=num_requests
)
return bench.run()
# Benchmark both implementations multiple times
before_runs = [benchmark_implementation("before", "old_method") for _ in range(5)]
after_runs = [benchmark_implementation("after", "new_method") for _ in range(5)]
# Extract latencies
before_latencies = [m.compute_statistics()['latency']['mean'] for m in before_runs]
after_latencies = [m.compute_statistics()['latency']['mean'] for m in after_runs]
# Statistical test
t_stat, p_value = scipy.stats.ttest_ind(before_latencies, after_latencies)
before_mean = sum(before_latencies) / len(before_latencies)
after_mean = sum(after_latencies) / len(after_latencies)
improvement = ((before_mean - after_mean) / before_mean) * 100
print(f"Before: {before_mean*1000:.2f}ms ± {scipy.stats.sem(before_latencies)*1000:.2f}ms")
print(f"After: {after_mean*1000:.2f}ms ± {scipy.stats.sem(after_latencies)*1000:.2f}ms")
print(f"Improvement: {improvement:.1f}%")
print(f"p-value: {p_value:.6f}")
if p_value < 0.05:
print("Result is statistically significant (p < 0.05)")
else:
print("Result is NOT statistically significant")