Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
414 changes: 414 additions & 0 deletions opentinker/backend_patch/verl/trainer/ppo/wmc_erc.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions opentinker/client/client_config/alfworld_param.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ algorithm: "agent_loop"
# - "grpo" : Standard GRPO (outcome-only advantage)
# - "grpo_per_step" : Per-step GRPO with return-based advantages (for multi-turn tasks)
# - "gae" : Generalized Advantage Estimation (for PPO, requires critic)
adv_estimator: "grpo"
adv_estimator: "gae"
# rollout_n: number of samples per prompt for GRPO/grpo_per_step
# For PPO (gae), rollout_n is typically 1
rollout_n: 8
rollout_n: 1

# Interaction configuration
interaction:
Expand All @@ -60,7 +60,7 @@ interaction:
env_endpoint: http://${interaction.config.env_host}:${interaction.config.env_port}
# If you run the ALFWorld env server in sharded mode (--shards N),
# set env_shards=N. The client will route each instance_id to a stable shard.
env_shards: 32
env_shards: 8
max_steps: 20 # ALFWorld episodes max steps
max_total_steps: 20 # Max environment step calls (controls rollout turns)
observation_template: "{observation}"
Expand Down
95 changes: 95 additions & 0 deletions opentinker/client/client_config/alfworld_wmc_erc_param.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# ALFWorld Training Configuration with WMC-ERC Dynamic Entropy Clipping
# Use with: python alfworld_rl.py --config-name alfworld_wmc_erc_param
#
# WMC-ERC (World Model-Conditioned Entropy Regularized Co-evolution):
# Uses the LLM's prediction entropy at env token positions as a World Model
# uncertainty signal (H_WM) to dynamically gate policy gradient updates.
# Prevents entropy collapse in well-understood regions while permitting
# exploration in uncertain ones.

# Project settings
project_name: opentinker
experiment_name: alfworld_wmc_erc

# Logging
logger_backends: ["console", "wandb"]

# Tracing (optional)
enable_tracing: true
weave_project: null

# WandB (optional)
wandb_key: null

# Model and tokenizer
tokenizer_path: null

# Training parameters
batch_size: 8
num_workers: 4
num_epochs: null
num_steps: 1000
save_freq: 200
test_freq: 50

# Validation parameters
val_batch_size: 50

# Generation parameters
temperature: 1
top_p: 1
max_new_tokens: 4096
max_prompt_tokens: 2048

# Algorithm (must be agent_loop for multi-turn)
algorithm: "agent_loop"

# PPO with GAE advantage estimation (requires critic)
adv_estimator: "gae"
rollout_n: 1

# WMC-ERC: Dynamic Entropy Clipping
# - mu_base: base clipping coefficient (controls tightness of the gate)
# - lambda_wm: how much WM uncertainty widens the gate (higher = more tolerant in unknown regions)
# - enable: master switch
wmc_erc:
enable: true
mu_base: 1.0
lambda_wm: 10.0
entropy_floor: 0.1
# Adaptive entropy control via beta_token (replaces fixed entropy_coeff)
# beta = base_entropy_coeff * max(0, 1 - turn_entropy/target) per turn
# -> full bonus at entropy=0, zero bonus at entropy=target, never pushes above target
entropy_target: 2.0
base_entropy_coeff: 0.02

# Disable fixed entropy_coeff since adaptive beta_token handles it
entropy_coeff: 0.0

# Interaction configuration
interaction:
name: alfworld
class_path: opentinker.environment.gym_environment_interaction.GymEnvironmentInteraction
config:
env_host: 0.0.0.0
env_port: 8092
env_endpoint: http://${interaction.config.env_host}:${interaction.config.env_port}
env_shards: 8
max_steps: 20
max_total_steps: 20
observation_template: "{observation}"
split: train

multi_turn:
max_user_turns: ${interaction.config.max_total_steps}
max_assistant_turns: ${interaction.config.max_total_steps}
max_tokens_per_turn: 512
weave_project: null
experiment_name: "alfworld_wmc_erc"

# Scheduler settings
scheduler_url: "http://0.0.0.0:8780"
scheduler_api_key: null

# GPU settings
num_gpus: 8
27 changes: 26 additions & 1 deletion opentinker/client/utils/http_training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,38 @@ def set_config(self, args: DictConfig, env=None):
server_cfg = OmegaConf.merge(
server_cfg,
OmegaConf.create(
{"actor_rollout_ref": {"rollout": {"agent": {"num_workers": agent_num_workers}}}}
{
"actor_rollout_ref": {
"rollout": {"agent": {"num_workers": agent_num_workers}}
}
}
),
)
print(
f"[ServiceClient] Overriding agent num_workers to: {agent_num_workers}"
)

# Pass WMC-ERC config to server if present
wmc_erc_cfg = getattr(args, "wmc_erc", None)
if wmc_erc_cfg is not None:
wmc_erc_dict = OmegaConf.to_container(wmc_erc_cfg, resolve=True)
server_cfg = OmegaConf.merge(
server_cfg,
OmegaConf.create({"wmc_erc": wmc_erc_dict}),
)
print(f"[ServiceClient] Passing WMC-ERC config to server: {wmc_erc_dict}")

# Pass entropy_coeff to server actor config if present
entropy_coeff = getattr(args, "entropy_coeff", None)
if entropy_coeff is not None:
server_cfg = OmegaConf.merge(
server_cfg,
OmegaConf.create(
{"actor_rollout_ref": {"actor": {"entropy_coeff": entropy_coeff}}}
),
)
print(f"[ServiceClient] Setting entropy_coeff: {entropy_coeff}")

generation_config = {
"temperature": args.temperature,
"top_p": args.top_p,
Expand Down
46 changes: 13 additions & 33 deletions opentinker/scheduler/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def check_gpu_available(gpu_id: int) -> bool:
return True # Fail open

# Thresholds for considering a GPU "idle"
MAX_MEMORY_MB = 10 # Allow up to 100 MB (some baseline CUDA overhead)
MAX_MEMORY_MB = 1000 # Allow up to 1000 MB (some overhead from Ray/CUDA init)
MAX_UTILIZATION = 1000 # Allow up to 5% utilization

if memory_used_mb > MAX_MEMORY_MB or utilization_percent > MAX_UTILIZATION:
Expand All @@ -294,35 +294,9 @@ def check_gpu_available(gpu_id: int) -> bool:
)
return False

# Check 2: Look for running processes on this GPU
pmon_result = subprocess.run(
["nvidia-smi", "pmon", "-c", "1", "-s", "um"],
capture_output=True,
text=True,
timeout=5,
)

if pmon_result.returncode == 0:
# Parse pmon output to check for processes on this GPU
# Format: "# gpu pid type sm mem enc dec command"
# " 0 12345 C 50 500 0 0 python"
lines = pmon_result.stdout.strip().split("\n")
for line in lines:
if line.startswith("#") or not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
try:
gpu_idx = int(parts[0].strip())
if gpu_idx == gpu_id and parts[1].strip() != "-":
# Found a process on this GPU
pid = parts[1].strip()
logger.warning(
f"GPU {gpu_id}: ⚠️ OCCUPIED - Process {pid} detected via pmon"
)
return False
except (ValueError, IndexError):
continue
# Check 2: pmon process check - SKIPPED to allow GPU sharing for small models
# When using small models (e.g. 0.5B), GPU sharing is safe as long as
# total memory fits. The memory threshold above handles this.

# All checks passed - GPU is idle
logger.debug(
Expand Down Expand Up @@ -1085,12 +1059,18 @@ def _launch_server(self, job: JobInfo) -> subprocess.Popen:
if kl_config:
use_kl_in_reward = kl_config.get("use_kl_in_reward")
if use_kl_in_reward is not None:
cmd.append(f"algorithm.use_kl_in_reward={str(use_kl_in_reward).lower()}")
logger.info(f"Job {job.job_id}: ✓ KL use_kl_in_reward={use_kl_in_reward}")
cmd.append(
f"algorithm.use_kl_in_reward={str(use_kl_in_reward).lower()}"
)
logger.info(
f"Job {job.job_id}: ✓ KL use_kl_in_reward={use_kl_in_reward}"
)

use_kl_loss = kl_config.get("use_kl_loss")
if use_kl_loss is not None:
cmd.append(f"actor_rollout_ref.actor.use_kl_loss={str(use_kl_loss).lower()}")
cmd.append(
f"actor_rollout_ref.actor.use_kl_loss={str(use_kl_loss).lower()}"
)
logger.info(f"Job {job.job_id}: ✓ KL use_kl_loss={use_kl_loss}")

kl_loss_coef = kl_config.get("kl_loss_coef")
Expand Down
52 changes: 50 additions & 2 deletions opentinker/server/http_training_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import asyncio
import base64
import gc
import logging
import signal
import sys
Expand Down Expand Up @@ -943,7 +944,9 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]:
# episodes into individual per-turn training samples, the gen_batch_output
# batch size is larger than the original batch. We need to expand the
# original batch to match using the expansion index.
expansion_index = gen_batch_output.meta_info.pop('per_turn_expansion_index', None)
expansion_index = gen_batch_output.meta_info.pop(
"per_turn_expansion_index", None
)
if expansion_index is not None:
logger.info(
f"[Per-turn training] Expanding original batch from {len(batch)} to "
Expand All @@ -956,6 +959,7 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]:
elif batch.batch is not None:
# Empty TensorDict (all keys were popped) — create new one with expanded size
from tensordict import TensorDict

batch.batch = TensorDict({}, batch_size=[len(expansion_index)])
# Expand non-tensor batch
expanded_non_tensor = {}
Expand Down Expand Up @@ -1080,6 +1084,7 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]:
# ===== DEBUG LOGGING END =====

metrics.update(old_log_prob_metrics)
_wmc_erc_entropys = entropys # Preserve for WMC-ERC before pop
old_log_prob.batch.pop("entropys")
batch = batch.union(old_log_prob)
logger.info(
Expand Down Expand Up @@ -1161,6 +1166,23 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]:
config=self.config.algorithm,
)

# 9.5 WMC-ERC: Dynamic entropy clipping
wmc_erc_cfg = OmegaConf.select(self.config, "wmc_erc", default=None)
if wmc_erc_cfg and wmc_erc_cfg.get("enable", False):
from opentinker.backend_patch.verl.trainer.ppo.wmc_erc import (
apply_wmc_erc,
)

batch, wmc_metrics = apply_wmc_erc(
batch, _wmc_erc_entropys, wmc_erc_cfg
)
metrics.update(wmc_metrics)
logger.info(
f"[WMC-ERC] mask_ratio={wmc_metrics.get('wmc_erc/mask_ratio', 'N/A'):.3f}, "
f"s_star={wmc_metrics.get('wmc_erc/s_star_mean', 'N/A'):.4f}, "
f"h_wm={wmc_metrics.get('wmc_erc/h_wm_mean', 'N/A'):.4f}"
)

# 10. Update critic
if self.use_critic:
with marked_timer("update_critic", timing_raw, color="pink"):
Expand Down Expand Up @@ -1258,6 +1280,13 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]:

logger.info(f"Training step {self.global_steps} completed successfully")

# Free large intermediates and force garbage collection to prevent OOM
del batch, gen_batch, gen_batch_output, reward_tensor
if "_wmc_erc_entropys" in dir():
del _wmc_erc_entropys
gc.collect()
torch.cuda.empty_cache()

return {
"status": "success",
"metrics": metrics,
Expand Down Expand Up @@ -1535,7 +1564,9 @@ def validate_step(self, batch: DataProto) -> Dict[str, Any]:

# 6. Merge original batch and generated output
# Per-turn training expansion: expand batch if gen output is larger
expansion_index = gen_batch_output.meta_info.pop('per_turn_expansion_index', None)
expansion_index = gen_batch_output.meta_info.pop(
"per_turn_expansion_index", None
)
if expansion_index is not None:
logger.info(
f"[Per-turn training] Validation: Expanding original batch from {len(batch)} to "
Expand All @@ -1547,6 +1578,7 @@ def validate_step(self, batch: DataProto) -> Dict[str, Any]:
elif batch.batch is not None:
# Empty TensorDict (all keys were popped) — create new one with expanded size
from tensordict import TensorDict

batch.batch = TensorDict({}, batch_size=[len(expansion_index)])
expanded_non_tensor = {}
for k, v in batch.non_tensor_batch.items():
Expand Down Expand Up @@ -2106,6 +2138,14 @@ def run_fastapi_server():
namespace=_server_cfg.ray.namespace,
num_gpus=_server_cfg.trainer.n_gpus_per_node, # Explicitly specify number of GPUs
ignore_reinit_error=True,
runtime_env={
"env_vars": {
"NCCL_CUMEM_ENABLE": "0",
"VLLM_DISABLE_SLEEP_MODE": "1",
"RAY_memory_usage_threshold": "0.99",
"VLLM_GPU_MEMORY_UTILIZATION": "0.15",
},
},
)
else:
# Connect to existing Ray cluster at specific address
Expand All @@ -2114,6 +2154,14 @@ def run_fastapi_server():
address=_server_cfg.ray.address,
namespace=_server_cfg.ray.namespace,
ignore_reinit_error=True,
runtime_env={
"env_vars": {
"NCCL_CUMEM_ENABLE": "0",
"VLLM_DISABLE_SLEEP_MODE": "1",
"RAY_memory_usage_threshold": "0.99",
"VLLM_GPU_MEMORY_UTILIZATION": "0.15",
},
},
)

# Verify GPU availability
Expand Down
2 changes: 1 addition & 1 deletion opentinker/server/launch_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def main(cfg):

cfg.actor_rollout_ref.rollout.tensor_model_parallel_size = 2
cfg.actor_rollout_ref.rollout.name = "vllm"
cfg.actor_rollout_ref.rollout.gpu_memory_utilization = 0.6
cfg.actor_rollout_ref.rollout.gpu_memory_utilization = 0.15

# GRPO/GRPO-per-step 特定配置
# grpo_per_step uses the same training framework as grpo, just with different advantage estimation
Expand Down
Loading
Loading