Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/default/cuckoo.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ upload_max_size = 100000000
# Prevent upload of files that passes upload_max_size?
do_upload_max_size = no

# Enable multi-worker mode: one ResultServer process per VM
multiworker = no
# Base port for auto-assigning per-VM ports (multiworker mode)
# VM 1 gets base_port, VM 2 gets base_port+1, etc.
base_port = 2042

[processing]
# Set the maximum size of analyses generated files to process. This is used
# to avoid the processing of big files which may take a lot of processing
Expand Down
19 changes: 13 additions & 6 deletions lib/cuckoo/common/abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ def initialize(self) -> None:

def _initialize(self) -> None:
"""Read configuration."""
multiworker_enabled = cfg.resultserver.get("multiworker", False)
base_port = cfg.resultserver.get("base_port", 2042)

mmanager_opts = self.options.get(self.module_name)
for machine_id in mmanager_opts["machines"]:
for machine_index, machine_id in enumerate(mmanager_opts["machines"]):
try:
machine_opts = self.options.get(machine_id.strip())
machine = Dictionary()
Expand All @@ -176,12 +179,16 @@ def _initialize(self) -> None:
machine.resultserver_ip = machine_opts.get("resultserver_ip", cfg.resultserver.ip)
machine.resultserver_port = machine_opts.get("resultserver_port")
if machine.resultserver_port is None:
# The ResultServer port might have been dynamically changed,
# get it from the ResultServer singleton. Also avoid import
# recursion issues by importing ResultServer here.
from lib.cuckoo.core.resultserver import ResultServer
if multiworker_enabled:
# In multiworker mode, auto-assign sequential ports
machine.resultserver_port = base_port + machine_index
else:
# The ResultServer port might have been dynamically changed,
# get it from the ResultServer singleton. Also avoid import
# recursion issues by importing ResultServer here.
from lib.cuckoo.core.resultserver import ResultServer

machine.resultserver_port = ResultServer().port
machine.resultserver_port = ResultServer().port

# Strip parameters.
for key, value in machine.items():
Expand Down
20 changes: 10 additions & 10 deletions lib/cuckoo/core/analysis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,15 @@ def route_network(self):
self.machine.ip,
str(routing.inetsim.server),
str(routing.inetsim.dnsport),
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.inetsim.ports),
)

elif self.route == "tor":
self.rooter_response = rooter(
"socks5_enable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.tor.dnsport),
str(routing.tor.proxyport),
)
Expand All @@ -600,14 +600,14 @@ def route_network(self):
self.rooter_response = rooter(
"socks5_enable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(self.socks5s[self.route]["dnsport"]),
str(self.socks5s[self.route]["port"]),
)
self.rooter_response = rooter("libvirt_fwo_enable", self.machine.interface, self.machine.ip)

elif self.route in ("none", "None", "drop"):
self.rooter_response = rooter("drop_enable", self.machine.ip, str(self.cfg.resultserver.port))
self.rooter_response = rooter("drop_enable", self.machine.ip, str(self.machine.resultserver_port))
elif self.route[:3] == "tun" and is_network_interface(self.route):
self.log.info("Network interface %s is tunnel", self.interface)
self.rooter_response = rooter("interface_route_tun_enable", self.machine.ip, self.route, str(self.task.id))
Expand Down Expand Up @@ -648,7 +648,7 @@ def route_network(self):
self.machine.ip,
self.cfg.resultserver.ip,
"tcp",
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
)
self.rooter_response = rooter(
"forward_enable", input_interface, self.machine.interface, self.cfg.resultserver.ip, self.machine.ip
Expand Down Expand Up @@ -696,7 +696,7 @@ def unroute_network(self):
self.machine.ip,
self.cfg.resultserver.ip,
"tcp",
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
)
self.rooter_response = rooter(
"forward_disable", input_interface, self.machine.interface, self.cfg.resultserver.ip, self.machine.ip
Expand All @@ -720,15 +720,15 @@ def unroute_network(self):
self.machine.ip,
routing.inetsim.server,
str(routing.inetsim.dnsport),
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.inetsim.ports),
)

elif self.route == "tor":
self.rooter_response = rooter(
"socks5_disable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(routing.tor.dnsport),
str(routing.tor.proxyport),
)
Expand All @@ -737,14 +737,14 @@ def unroute_network(self):
self.rooter_response = rooter(
"socks5_disable",
self.machine.ip,
str(self.cfg.resultserver.port),
str(self.machine.resultserver_port),
str(self.socks5s[self.route]["dnsport"]),
str(self.socks5s[self.route]["port"]),
)
self.rooter_response = rooter("libvirt_fwo_disable", self.machine.interface, self.machine.ip)

elif self.route in ("none", "None", "drop"):
self.rooter_response = rooter("drop_disable", self.machine.ip, str(self.cfg.resultserver.port))
self.rooter_response = rooter("drop_disable", self.machine.ip, str(self.machine.resultserver_port))
elif self.route[:3] == "tun":
self.log.info("Disable tunnel interface: %s", self.interface)
self.rooter_response = rooter("interface_route_tun_disable", self.machine.ip, self.route, str(self.task.id))
Expand Down
2 changes: 2 additions & 0 deletions lib/cuckoo/core/data/machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
raise CuckooDependencyError("Unable to import sqlalchemy (install with `poetry install`)")

MACHINE_RUNNING = "running"
MACHINE_POWEROFF = "poweroff"

log = logging.getLogger(__name__)
web_conf = Config("web")
Expand Down Expand Up @@ -340,6 +341,7 @@ def unlock_machine(self, machine: Machine) -> Machine:
"""
machine.locked = False
machine.locked_changed_on = _utcnow_naive()
self.set_machine_status(machine, MACHINE_POWEROFF)
self.session.merge(machine)
return machine

Expand Down
203 changes: 200 additions & 3 deletions lib/cuckoo/core/resultserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import errno
import json
import logging
import multiprocessing
import os
import signal
import socket
import struct
from contextlib import suppress
Expand Down Expand Up @@ -598,13 +600,159 @@ def negotiate_protocol(self, task_id, ctx):
return klass(task_id, ctx, version)


class SingleVMResultServerWorker(GeventResultServerWorker):
"""A GeventResultServerWorker variant for multiworker mode.

Instead of maintaining an IP-to-task mapping, this worker reads
the current task_id from a shared multiprocessing.Value, since
it only serves a single VM.
"""

def __init__(self, sock, shared_task_id, vm_ip, **kwargs):
super().__init__(sock, **kwargs)
self._shared_task_id = shared_task_id
self._vm_ip = vm_ip

def handle(self, sock, addr):
"""Override handle to use shared task_id instead of IP lookup."""
task_id = self._shared_task_id.value
if not task_id:
log.warning("Worker for %s has no active task, rejecting connection from %s", self._vm_ip, addr[0])
return

self.task_id = task_id
self.storagepath = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id))
self.create_folders()

ctx = HandlerContext(task_id, self.storagepath, sock)
task_log_start(task_id)
try:
try:
protocol = self.negotiate_protocol(task_id, ctx)
except EOFError:
return

# Re-check task_id after negotiation - the task could have been
# cancelled and a new one started while we were blocking on I/O
if self._shared_task_id.value != task_id:
log.warning(
"Task #%d for VM %s was cancelled during negotiation, new task is %s",
task_id, self._vm_ip, self._shared_task_id.value or "none",
)
return

if not protocol:
return

# Register context for cancellation
with self.task_mgmt_lock:
s = self.handlers.setdefault(task_id, set())
s.add(ctx)

try:
with protocol:
protocol.handle()
except CuckooOperationalError as e:
log.exception(e)
finally:
with self.task_mgmt_lock:
s.discard(ctx)
ctx.cancel()
if ctx.buf:
log.warning("Task #%s with protocol %s has unprocessed data", task_id, protocol)
finally:
task_log_stop(task_id)

def cancel_task(self, task_id):
"""Cancel all handlers for a task."""
with self.task_mgmt_lock:
ctxs = self.handlers.pop(task_id, set())
for ctx in ctxs:
ctx.cancel()
task_log_stop_force(task_id)


class ResultServerWorkerProcess(multiprocessing.Process):
"""Dedicated ResultServer process for a single VM.

Each worker runs its own gevent event loop and StreamServer,
listening on a unique port assigned to one VM. This bypasses
the GIL limitation of the single-process model by giving each
VM its own Python process for handling result uploads.
"""

def __init__(self, ip, port, listen_ip):
super().__init__(daemon=True, name=f"ResultServerWorker-{ip}:{port}")
self.ip = ip
self.port = port
self.listen_ip = listen_ip
self._task_id = multiprocessing.Value("i", 0)
self._ready = multiprocessing.Event()

def run(self):
"""Entry point for the worker process. Sets up a standalone
gevent StreamServer and serves forever."""
signal.signal(signal.SIGINT, signal.SIG_IGN)

# Reinitialize gevent's event loop after fork to avoid
# inheriting the parent's loop state (watchers, callbacks, etc.)
import gevent
import gevent.socket

gevent.reinit()
gevent.get_hub()

sock = gevent.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((self.listen_ip, self.port))
except OSError as e:
log.error("Worker for %s failed to bind on %s:%s: %s", self.ip, self.listen_ip, self.port, e)
return
sock.listen(32)

server = SingleVMResultServerWorker(sock, self._task_id, self.ip, spawn="default")
self._ready.set()
log.info("ResultServer worker for VM %s started on port %d (pid %d)", self.ip, self.port, os.getpid())
server.serve_forever()

def set_task(self, task_id):
"""Register a task with this worker (called from parent process)."""
self._task_id.value = task_id
self._ready.wait(timeout=10)

def clear_task(self):
"""Clear the current task (called from parent process)."""
self._task_id.value = 0

def get_task_id(self):
return self._task_id.value


class ResultServer(metaclass=Singleton):
"""Manager for the ResultServer worker and task state."""
"""Manager for the ResultServer worker and task state.

Supports two modes:
- Legacy (default): Single gevent StreamServer on one port for all VMs
- Multiworker: One dedicated process per VM, each on its own port
"""

def __init__(self):
if not categories_need_VM:
return

self.multiworker = cfg.resultserver.get("multiworker", False)
self.ip = cfg.resultserver.ip

if self.multiworker:
self.workers = {} # ip -> ResultServerWorkerProcess
self.port = cfg.resultserver.get("base_port", 2042)
log.info("ResultServer starting in multiworker mode (base_port=%d)", self.port)
else:
self._start_legacy_server()

def _start_legacy_server(self):
"""Start the traditional single-port ResultServer."""
ip = cfg.resultserver.ip
port = cfg.resultserver.port
pool_size = cfg.resultserver.pool_size
Expand Down Expand Up @@ -647,11 +795,60 @@ def __init__(self):

def add_task(self, task, machine):
"""Register a task/machine with the ResultServer."""
self.instance.add_task(task.id, machine.ip)
if self.multiworker:
self._add_task_multiworker(task, machine)
else:
self.instance.add_task(task.id, machine.ip)

def del_task(self, task, machine):
"""Delete running task and cancel existing handlers."""
self.instance.del_task(task.id, machine.ip)
if self.multiworker:
self._del_task_multiworker(task, machine)
else:
self.instance.del_task(task.id, machine.ip)

def _add_task_multiworker(self, task, machine):
"""Start or reuse a worker process for this VM and register the task."""
worker = self.workers.get(machine.ip)

# Spawn a new worker if none exists or the previous one died
if worker is None or not worker.is_alive():
if worker is not None:
log.warning("Worker for %s (pid %d) died, spawning replacement", machine.ip, worker.pid)
worker.close()
worker = ResultServerWorkerProcess(
ip=machine.ip,
port=int(machine.resultserver_port),
listen_ip=self.ip,
)
worker.start()
self.workers[machine.ip] = worker
log.debug("Task #%s: Started worker for %s on port %d", task.id, machine.ip, machine.resultserver_port)

worker.set_task(task.id)
log.debug("Task #%s: Registered with worker for %s", task.id, machine.ip)

def _del_task_multiworker(self, task, machine):
"""Clear task from worker. Worker stays alive for reuse."""
worker = self.workers.get(machine.ip)
if worker and worker.is_alive():
worker.clear_task()
log.debug("Task #%s: Cleared from worker for %s", task.id, machine.ip)
else:
log.warning("Task #%s: No alive worker found for %s", task.id, machine.ip)

def shutdown_workers(self):
"""Terminate all multiworker processes."""
if not getattr(self, "multiworker", False):
return
for ip, worker in self.workers.items():
if worker.is_alive():
log.info("Terminating ResultServer worker for %s (pid %d)", ip, worker.pid)
worker.terminate()
worker.join(timeout=5)
if worker.is_alive():
worker.kill()
self.workers.clear()

def create_server(self, sock, pool_size):
if pool_size:
Expand Down
Loading
Loading