Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,33 @@ time_limit = "00:10:00"
concurrency = 4
request-count = 50

[[Tests]]
id = "test.disagg.shared-node"
test_name = "vLLM"
num_nodes = 2
time_limit = "00:10:00"

[Tests.cmd_args]

[Tests.cmd_args.dynamo.prefill_worker]
num-nodes = 2
[Tests.cmd_args.dynamo.prefill_worker.args]
tensor-parallel-size = 4
pipeline-parallel-size = 1

[Tests.cmd_args.dynamo.decode_worker]
num-nodes = 2
[Tests.cmd_args.dynamo.decode_worker.args]
tensor-parallel-size = 4
pipeline-parallel-size = 1

[[Tests.cmd_args.aiperf_phases]]
name = "shared_node_smoke"
[Tests.cmd_args.aiperf_phases.args]
concurrency = 2
request-count = 50
server-metrics = "auto"

[[Tests]]
id = "test.disagg.multinode"
test_name = "vLLM"
Expand Down
15 changes: 13 additions & 2 deletions doc/workloads/ai_dynamo.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ AI Dynamo jobs use three distinct types of nodes:
- **Prefill node(s)**: Handle the prefill stage of inference
- **Decode node(s)**: Handle the decode stage of inference (optional, depending on model and setup)

The total number of required nodes must be:
By default, when ``num_nodes`` is omitted, CloudAI allocates separate nodes for prefill and decode workers:

::

num_prefill_nodes + num_decode_nodes

If there is a mismatch in the number of nodes between the schema and the test scenario, CloudAI will use the number of nodes specified in the test schema, ignoring the value in the test scenario.
Set top-level ``num_nodes`` explicitly to control the Slurm allocation. A value lower than
``num_prefill_nodes + num_decode_nodes`` enables shared-node disaggregated inference, where prefill and decode roles
run on the same allocated node(s) with separate GPU slices.

All node role assignments and orchestration are automatically managed by CloudAI.

Expand Down Expand Up @@ -303,6 +305,15 @@ If AIPerf accuracy mode is enabled, CloudAI copies ``aiperf_accuracy_artifacts/a

Navigate to ``./results/<scenario>/<test-id>/0/`` and open the CSV to examine performance metrics.

Shared-Node Disaggregated Runs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

For Slurm, set top-level ``num_nodes`` lower than the sum of ``prefill_worker.num-nodes`` and
``decode_worker.num-nodes`` to run both roles on the same allocated node(s). For example, ``num_nodes = 1`` with
``prefill_worker.num-nodes = 1`` and ``decode_worker.num-nodes = 1`` runs one prefill worker and one decode worker on
the same node. CloudAI assigns decode GPUs first and prefill GPUs after that based on each role's
``tensor-parallel-size * pipeline-parallel-size``. The combined role GPU count must fit on one node.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Example ``aiperf_report.csv``:

::
Expand Down
1 change: 1 addition & 0 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class TestRun:
post_test: Optional[TestScenario] = None
reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set)
extra_srun_args: str | None = None
num_nodes_explicit: bool = False

def __hash__(self) -> int:
return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration))
Expand Down
1 change: 1 addition & 0 deletions src/cloudai/test_scenario_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def _create_test_run(
reports=get_reporters(test_info, tdef),
extra_srun_args=test_info.extra_srun_args,
exclude_nodes=test_info.exclude_nodes,
num_nodes_explicit="num_nodes" in test_info.model_fields_set,
)

return tr
Expand Down
17 changes: 17 additions & 0 deletions src/cloudai/workloads/ai_dynamo/ai_dynamo.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,23 @@ def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool:
return False
logging.info("constraint_check passed for: tp_times_pp_le_gpus_per_node")

role_total_nodes = int(prefill_worker.num_nodes) + int(decode_worker.num_nodes)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
prefill_nodes = set(prefill_worker.nodes.split(",")) if prefill_worker.nodes else set()
decode_nodes = set(decode_worker.nodes.split(",")) if decode_worker.nodes else set()
has_explicit_allocation = getattr(tr, "num_nodes_explicit", False) or bool(tr.nodes)
shared_node_disagg = bool(prefill_nodes & decode_nodes) or (
has_explicit_allocation and tr.nnodes < role_total_nodes
)
if (
shared_node_disagg
and gpus_per_node > 0
and self.constraints.tp_times_pp_le_gpus_per_node
and (prefill_tp * prefill_pp + decode_tp * decode_pp > gpus_per_node)
):
logging.info("constraint_check failed for: shared_node_tp_pp_sum_le_gpus_per_node")
return False
logging.info("constraint_check passed for: shared_node_tp_pp_sum_le_gpus_per_node")

return True


Expand Down
109 changes: 93 additions & 16 deletions src/cloudai/workloads/ai_dynamo/ai_dynamo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ declare -A aiperf_accuracy_args
declare -A aiperf_accuracy_config

lmcache_controller_cmd=""
SHARED_NODE_DISAGG="false"

declare -A dynamo_args
dynamo_args["backend"]="vllm"
Expand Down Expand Up @@ -88,6 +89,18 @@ _csv_index_of() {
echo "-1"
}

_csv_lists_overlap() {
local left="$1"
local right="$2"
local item
for item in $(echo "$left" | tr ',' ' '); do
if [[ ",${right}," == *",${item},"* ]]; then
return 0
fi
done
return 1
}

_gpus_per_node() {
local n=$(echo "${CUDA_VISIBLE_DEVICES:-}" | tr ',' '\n' | grep -c . || true)
[[ "$n" -gt 0 ]] && echo "$n" || echo "1"
Expand Down Expand Up @@ -238,7 +251,15 @@ _set_nodelists()
fi

if [[ -z "${prefill_config["node-list"]}" ]]; then
prefill_config["node-list"]=$(_populate_nodelist "${prefill_config["num-nodes"]}" "${decode_config["node-list"]}")
local allocated_nodes
local requested_role_nodes
allocated_nodes=$(_csv_len "$DYNAMO_NODELIST")
requested_role_nodes=$(( ${decode_config["num-nodes"]:-0} + ${prefill_config["num-nodes"]:-0} ))
if [[ "$allocated_nodes" -lt "$requested_role_nodes" ]]; then
prefill_config["node-list"]=$(_populate_nodelist "${prefill_config["num-nodes"]}" "")
else
prefill_config["node-list"]=$(_populate_nodelist "${prefill_config["num-nodes"]}" "${decode_config["node-list"]}")
fi
fi

# Prefill nodelist should match prefill node count (skip validation if num-nodes is 0)
Expand All @@ -256,6 +277,12 @@ _set_nodelists()
log "ERROR: number of nodes in decode nodelist (${decode_nodelist_count}) does not match decode node count (${decode_config["num-nodes"]})"
exit 1
fi

SHARED_NODE_DISAGG="false"
if _csv_lists_overlap "${prefill_config["node-list"]}" "${decode_config["node-list"]}"; then
SHARED_NODE_DISAGG="true"
log "Shared-node disaggregated mode: prefill and decode workers share node(s)"
fi
}

_has_connector() {
Expand Down Expand Up @@ -322,18 +349,33 @@ _compute_worker_allocation_vllm() {
exit 1
fi

if [[ "${prefill_config["multiple-workers-per-node"]}" != "true" ]]; then
prefill_config["gpus-per-worker"]=$num_gpus
fi
decode_config["gpu-offset"]=0
prefill_config["gpu-offset"]=0

if [[ "${decode_config["multiple-workers-per-node"]}" != "true" ]]; then
decode_config["gpus-per-worker"]=$num_gpus
if [[ "${SHARED_NODE_DISAGG}" == "true" ]]; then
local shared_gpus_needed=$(( prefill_config["gpus-per-worker"] + decode_config["gpus-per-worker"] ))
if [[ "$shared_gpus_needed" -gt "$num_gpus" ]]; then
log "ERROR: Not enough GPUs for shared-node disaggregated mode: need ${decode_config["gpus-per-worker"]} decode + ${prefill_config["gpus-per-worker"]} prefill, but only have ${num_gpus}"
exit 1
fi
decode_config["workers-per-node"]=1
prefill_config["workers-per-node"]=1
prefill_config["gpu-offset"]=${decode_config["gpus-per-worker"]}
else
if [[ "${prefill_config["multiple-workers-per-node"]}" != "true" ]]; then
prefill_config["gpus-per-worker"]=$num_gpus
fi

if [[ "${decode_config["multiple-workers-per-node"]}" != "true" ]]; then
decode_config["gpus-per-worker"]=$num_gpus
fi

prefill_config["workers-per-node"]=$(( num_gpus / prefill_config["gpus-per-worker"] ))
decode_config["workers-per-node"]=$(( num_gpus / decode_config["gpus-per-worker"] ))
fi

log "DECODE: num GPUs: $num_gpus, GPUs per worker: ${decode_config["gpus-per-worker"]}"
log "PREFILL: num GPUs: $num_gpus, GPUs per worker: ${prefill_config["gpus-per-worker"]}"
prefill_config["workers-per-node"]=$(( num_gpus / prefill_config["gpus-per-worker"] ))
decode_config["workers-per-node"]=$(( num_gpus / decode_config["gpus-per-worker"] ))
log "PREFILL: num GPUs: $num_gpus, GPUs per worker: ${prefill_config["gpus-per-worker"]}, GPU offset: ${prefill_config["gpu-offset"]}"
log "DECODE: workers per node: ${decode_config["workers-per-node"]}"
log "PREFILL: workers per node: ${prefill_config["workers-per-node"]}"

Expand Down Expand Up @@ -495,6 +537,15 @@ _gpu_list_for_worker() {
echo "$(echo $CUDA_VISIBLE_DEVICES | cut -d',' -f${start}-${end})"
}

_gpu_list_for_worker_offset() {
local per_worker=$1
local idx=$2
local offset=${3:-0}
local start=$(( 1 + offset + (idx * per_worker) ))
local end=$(( start + per_worker - 1 ))
echo "$CUDA_VISIBLE_DEVICES" | cut -d',' -f"${start}-${end}"
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

_log_file_for_worker() {
local role="$1"
local idx="$2"
Expand Down Expand Up @@ -891,13 +942,15 @@ function launch_decode()
local base_kv_event_port=${DYN_VLLM_KV_EVENT_PORT:-20080}
local base_kvbm_pub_port=${DYN_KVBM_LEADER_ZMQ_PUB_PORT:-56001}
local base_kvbm_ack_port=${DYN_KVBM_LEADER_ZMQ_ACK_PORT:-56002}
local base_system_port=${DYN_SYSTEM_PORT:-9090}
local kvbm_port_stride=2
local side_channel_host
side_channel_host="$(_current_node_ip)"
log "Launching $workers_per_node decode worker(s) with unique port ranges"

for i in $(seq 0 $(( $workers_per_node - 1 ))); do
local gpu_list=$(_gpu_list_for_worker "${decode_config["gpus-per-worker"]}" "$i")
local gpu_list
gpu_list=$(_gpu_list_for_worker "${decode_config["gpus-per-worker"]}" "$i")
local log_file=$(_log_file_for_worker "decode" "$i")
# Each worker needs unique port ranges to avoid ZMQ conflicts:
# - NIXL side channel: base_port + (worker_index * tp_size) for TP ranks
Expand All @@ -907,6 +960,7 @@ function launch_decode()
local kv_event_port=$((base_kv_event_port + i))
local kvbm_pub_port=$((base_kvbm_pub_port + (i * kvbm_port_stride)))
local kvbm_ack_port=$((base_kvbm_ack_port + (i * kvbm_port_stride)))
local system_port=$((base_system_port + i))

# Build decode args as proper bash arrays to preserve
# multi-word values (e.g. --cmd "genai-perf profile") through word splitting.
Expand All @@ -918,6 +972,7 @@ function launch_decode()
log "Launching decode worker $i on GPUs $gpu_list (NIXL host: $side_channel_host, NIXL port: $nixl_port, KV event port: $kv_event_port, KVBM pub/ack: $kvbm_pub_port/$kvbm_ack_port)"
log "Decode cmd: ${decode_config["cmd"]} ${args_arr[*]} ${decode_config["extra-args"]}"
CUDA_VISIBLE_DEVICES=$gpu_list \
DYN_SYSTEM_PORT=$system_port \
VLLM_NIXL_SIDE_CHANNEL_HOST="$side_channel_host" \
VLLM_NIXL_SIDE_CHANNEL_PORT=$nixl_port \
DYN_VLLM_KV_EVENT_PORT=$kv_event_port \
Expand Down Expand Up @@ -948,13 +1003,28 @@ function launch_prefill()
local base_kv_event_port=${DYN_VLLM_KV_EVENT_PORT:-20080}
local base_kvbm_pub_port=${DYN_KVBM_LEADER_ZMQ_PUB_PORT:-56001}
local base_kvbm_ack_port=${DYN_KVBM_LEADER_ZMQ_ACK_PORT:-56002}
local base_system_port=${DYN_SYSTEM_PORT:-9090}
local kvbm_port_stride=2
local gpu_offset=${prefill_config["gpu-offset"]:-0}
local side_channel_host
side_channel_host="$(_current_node_ip)"

if [[ "${SHARED_NODE_DISAGG}" == "true" ]]; then
local decode_workers=${decode_config["workers-per-node"]}
local decode_tp=${decode_args["--tensor-parallel-size"]}
base_nixl_port=$((base_nixl_port + (decode_workers * decode_tp)))
base_kv_event_port=$((base_kv_event_port + decode_workers))
base_kvbm_pub_port=$((base_kvbm_pub_port + (decode_workers * kvbm_port_stride)))
base_kvbm_ack_port=$((base_kvbm_ack_port + (decode_workers * kvbm_port_stride)))
base_system_port=$((base_system_port + decode_workers))
log "Shared-node prefill offsets: GPU offset=$gpu_offset, NIXL base=$base_nixl_port, KV event base=$base_kv_event_port, KVBM pub/ack base=$base_kvbm_pub_port/$base_kvbm_ack_port, system base=$base_system_port"
fi

log "Launching $workers_per_node prefill worker(s) with unique port ranges"

for i in $(seq 0 $(( $workers_per_node - 1 ))); do
local gpu_list=$(_gpu_list_for_worker "${prefill_config["gpus-per-worker"]}" "$i")
local gpu_list
gpu_list=$(_gpu_list_for_worker_offset "${prefill_config["gpus-per-worker"]}" "$i" "$gpu_offset")
local log_file=$(_log_file_for_worker "prefill" "$i")
# Each worker needs unique port ranges to avoid ZMQ conflicts:
# - NIXL side channel: base_port + (worker_index * tp_size) for TP ranks
Expand All @@ -964,6 +1034,7 @@ function launch_prefill()
local kv_event_port=$((base_kv_event_port + i))
local kvbm_pub_port=$((base_kvbm_pub_port + (i * kvbm_port_stride)))
local kvbm_ack_port=$((base_kvbm_ack_port + (i * kvbm_port_stride)))
local system_port=$((base_system_port + i))

# Build prefill args as proper bash arrays to preserve
# multi-word values (e.g. --cmd "genai-perf profile") through word splitting.
Expand All @@ -975,6 +1046,7 @@ function launch_prefill()
log "Launching prefill worker $i on GPUs $gpu_list (NIXL host: $side_channel_host, NIXL port: $nixl_port, KV event port: $kv_event_port, KVBM pub/ack: $kvbm_pub_port/$kvbm_ack_port)"
log "Prefill cmd: ${prefill_config["cmd"]} ${args_arr[*]} ${prefill_config["extra-args"]}"
CUDA_VISIBLE_DEVICES=$gpu_list \
DYN_SYSTEM_PORT=$system_port \
VLLM_NIXL_SIDE_CHANNEL_HOST="$side_channel_host" \
VLLM_NIXL_SIDE_CHANNEL_PORT=$nixl_port \
DYN_VLLM_KV_EVENT_PORT=$kv_event_port \
Expand Down Expand Up @@ -1031,19 +1103,24 @@ _resolve_aiperf_server_metrics_urls() {
local base_system_port=${DYN_SYSTEM_PORT:-9090}
local decode_workers_per_node=${decode_config["workers-per-node"]:-1}
local prefill_workers_per_node=${prefill_config["workers-per-node"]:-1}
local prefill_system_port_offset=0
local IFS_SAVE="$IFS"
local node i

if [[ "${SHARED_NODE_DISAGG}" == "true" ]]; then
prefill_system_port_offset=$decode_workers_per_node
fi

IFS=','
for node in ${prefill_config["node-list"]:-}; do
for i in $(seq 0 $(( prefill_workers_per_node - 1 ))); do
for node in ${decode_config["node-list"]:-}; do
for i in $(seq 0 $(( decode_workers_per_node - 1 ))); do
urls="${urls},http://${node}:$((base_system_port + i))/metrics"
done
done

for node in ${decode_config["node-list"]:-}; do
for i in $(seq 0 $(( decode_workers_per_node - 1 ))); do
urls="${urls},http://${node}:$((base_system_port + i))/metrics"
for node in ${prefill_config["node-list"]:-}; do
for i in $(seq 0 $(( prefill_workers_per_node - 1 ))); do
urls="${urls},http://${node}:$((base_system_port + prefill_system_port_offset + i))/metrics"
done
done

Expand Down
Loading
Loading