Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions veadk/community/langchain_ai/tools/execute_skills.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
from typing import List, Optional

from langchain.tools import ToolRuntime, tool

from veadk.auth.veauth.utils import get_credential_from_vefaas_iam
from veadk.config import getenv
from veadk.utils.logger import get_logger
from veadk.utils.volcengine_sign import ve_request

logger = get_logger(__name__)


def _clean_ansi_codes(text: str) -> str:
"""Remove ANSI escape sequences (color codes, etc.)"""
import re

ansi_escape = re.compile(r"\x1b\[[0-9;]*m")
return ansi_escape.sub("", text)


def _format_execution_result(result_str: str) -> str:
"""Format the execution results, handle escape characters and JSON structures"""
try:
result_json = json.loads(result_str)

if not result_json.get("success"):
message = result_json.get("message", "Unknown error")
outputs = result_json.get("data", {}).get("outputs", [])
if outputs and isinstance(outputs[0], dict):
error_msg = outputs[0].get("ename", "Unknown error")
return f"Execution failed: {message}, {error_msg}"

outputs = result_json.get("data", {}).get("outputs", [])
if not outputs:
return "No output generated"

formatted_lines = []
for output in outputs:
if output and isinstance(output, dict) and "text" in output:
text = output["text"]
text = _clean_ansi_codes(text)
text = text.replace("\\n", "\n")
formatted_lines.append(text)

return "".join(formatted_lines).strip()

except json.JSONDecodeError:
return _clean_ansi_codes(result_str)
except Exception as e:
logger.warning(f"Error formatting result: {e}, returning raw result")
return result_str


@tool
def execute_skills(
workflow_prompt: str,
runtime: ToolRuntime,
skills: Optional[List[str]] = None,
timeout: int = 900,
) -> str:
"""execute skills in a code sandbox and return the output.
For C++ code, don't execute it directly, compile and execute via Python; write sources and object files to /tmp.

Args:
workflow_prompt (str): instruction of workflow
skills (Optional[List[str]]): The skills will be invoked
timeout (int, optional): The timeout in seconds for the code execution, less than or equal to 900. Defaults to 900.

Returns:
str: The output of the code execution.
"""

tool_id = getenv("AGENTKIT_TOOL_ID")

service = getenv(
"AGENTKIT_TOOL_SERVICE_CODE", "agentkit"
) # temporary service for code run tool
region = getenv("AGENTKIT_TOOL_REGION", "cn-beijing")
host = getenv(
"AGENTKIT_TOOL_HOST", service + "." + region + ".volces.com"
) # temporary host for code run tool
logger.debug(f"tools endpoint: {host}")

session_id = runtime.session_id # type: ignore
agent_name = runtime.context.agent_name # type: ignore
user_id = runtime.context.user_id # type: ignore
tool_user_session_id = agent_name + "_" + user_id + "_" + session_id
logger.debug(f"tool_user_session_id: {tool_user_session_id}")

logger.debug(
f"Execute skills in session_id={session_id}, tool_id={tool_id}, host={host}, service={service}, region={region}, timeout={timeout}"
)

header = {}

ak = os.getenv("VOLCENGINE_ACCESS_KEY")
sk = os.getenv("VOLCENGINE_SECRET_KEY")
if not (ak and sk):
logger.debug(
"Get AK/SK from environment variables failed. Try to use credential from Iam."
)
credential = get_credential_from_vefaas_iam()
ak = credential.access_key_id
sk = credential.secret_access_key
header = {"X-Security-Token": credential.session_token}
else:
logger.debug("Successfully get AK/SK from environment variables.")

cmd = ["python", "agent.py", workflow_prompt]
if skills:
cmd.extend(["--skills"] + skills)

# TODO: remove after agentkit supports custom environment variables setting
res = ve_request(
request_body={},
action="GetCallerIdentity",
ak=ak,
sk=sk,
service="sts",
version="2018-01-01",
region=region,
host="sts.volcengineapi.com",
header=header,
)
try:
account_id = res["Result"]["AccountId"]
except KeyError as e:
logger.error(f"Error occurred while getting account id: {e}, response is {res}")
return res

env_vars = {
"TOS_SKILLS_DIR": f"tos://agentkit-platform-{account_id}/skills/",
"TOOL_USER_SESSION_ID": tool_user_session_id,
}

code = f"""
import subprocess
import os
import time
import select
import sys

env = os.environ.copy()
for key, value in {env_vars!r}.items():
if key not in env:
env[key] = value

process = subprocess.Popen(
{cmd!r},
cwd='/home/gem/veadk_skills',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
bufsize=1,
universal_newlines=True
)

start_time = time.time()
timeout = {timeout - 10}

with open('/tmp/agent.log', 'w') as log_file:
while True:
if time.time() - start_time > timeout:
process.kill()
log_file.write('log_type=stderr request_id=x function_id=y revision_number=1 Process timeout\\n')
break

reads = [process.stdout.fileno(), process.stderr.fileno()]
ret = select.select(reads, [], [], 1)

for fd in ret[0]:
if fd == process.stdout.fileno():
line = process.stdout.readline()
if line:
log_file.write(f'log_type=stdout request_id=x function_id=y revision_number=1 {{line}}')
log_file.flush()
if fd == process.stderr.fileno():
line = process.stderr.readline()
if line:
log_file.write(f'log_type=stderr request_id=x function_id=y revision_number=1 {{line}}')
log_file.flush()

if process.poll() is not None:
break

for line in process.stdout:
log_file.write(f'log_type=stdout request_id=x function_id=y revision_number=1 {{line}}')
for line in process.stderr:
log_file.write(f'log_type=stderr request_id=x function_id=y revision_number=1 {{line}}')

with open('/tmp/agent.log', 'r') as log_file:
output = log_file.read()
print(output)
"""

res = ve_request(
request_body={
"ToolId": tool_id,
"UserSessionId": tool_user_session_id,
"OperationType": "RunCode",
"OperationPayload": json.dumps(
{
"code": code,
"timeout": timeout,
"kernel_name": "python3",
}
),
},
action="InvokeTool",
ak=ak,
sk=sk,
service=service,
version="2025-10-30",
region=region,
host=host,
header=header,
)
logger.debug(f"Invoke run code response: {res}")

try:
return _format_execution_result(res["Result"]["Result"])
except KeyError as e:
logger.error(f"Error occurred while running code: {e}, response is {res}")
return res
2 changes: 1 addition & 1 deletion veadk/community/langchain_ai/tools/load_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def load_memory(query: str, runtime: ToolRuntime) -> list[str]:
return ["Long-term memory store is not initialized."]

app_name = store.index
user_id = runtime.context.user_id
user_id = runtime.context.user_id # type: ignore

logger.info(f"Load memory for user {user_id} with query {query}")
response = store.search((app_name, user_id), query=query)
Expand Down
112 changes: 112 additions & 0 deletions veadk/community/langchain_ai/tools/run_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os

from langchain.tools import ToolRuntime, tool

from veadk.auth.veauth.utils import get_credential_from_vefaas_iam
from veadk.config import getenv
from veadk.utils.logger import get_logger
from veadk.utils.volcengine_sign import ve_request

logger = get_logger(__name__)


@tool
def run_code(code: str, language: str, runtime: ToolRuntime, timeout: int = 30) -> str:
"""Run code in a code sandbox and return the output.
For C++ code, don't execute it directly, compile and execute via Python; write sources and object files to /tmp.

Args:
code (str): The code to run.
language (str): The programming language of the code. Language must be one of the supported languages: python3.
timeout (int, optional): The timeout in seconds for the code execution. Defaults to 30.

Returns:
str: The output of the code execution.
"""

tool_id = getenv("AGENTKIT_TOOL_ID")

service = getenv(
"AGENTKIT_TOOL_SERVICE_CODE", "agentkit"
) # temporary service for code run tool
region = getenv("AGENTKIT_TOOL_REGION", "cn-beijing")
host = getenv(
"AGENTKIT_TOOL_HOST", service + "." + region + ".volces.com"
) # temporary host for code run tool
scheme = os.getenv("AGENTKIT_TOOL_SCHEME", "https").lower()
if scheme not in {"http", "https"}:
scheme = "https"
logger.debug(f"tools endpoint: {host}")

session_id = runtime.context.session_id # type: ignore
user_id = runtime.context.user_id # type: ignore
agent_name = runtime.context.agent_name # type: ignore

tool_user_session_id = agent_name + "_" + user_id + "_" + session_id
logger.debug(f"tool_user_session_id: {tool_user_session_id}")

logger.debug(
f"Running code in language: {language}, session_id={session_id}, code={code}, tool_id={tool_id}, host={host}, service={service}, region={region}, timeout={timeout}"
)

header = {}

logger.debug("Get AK/SK from tool context failed.")
ak = os.getenv("VOLCENGINE_ACCESS_KEY")
sk = os.getenv("VOLCENGINE_SECRET_KEY")
if not (ak and sk):
logger.debug(
"Get AK/SK from environment variables failed. Try to use credential from Iam."
)
credential = get_credential_from_vefaas_iam()
ak = credential.access_key_id
sk = credential.secret_access_key
header = {"X-Security-Token": credential.session_token}
else:
logger.debug("Successfully get AK/SK from environment variables.")

res = ve_request(
request_body={
"ToolId": tool_id,
"UserSessionId": tool_user_session_id,
"OperationType": "RunCode",
"OperationPayload": json.dumps(
{
"code": code,
"timeout": timeout,
"kernel_name": language,
}
),
},
action="InvokeTool",
ak=ak,
sk=sk,
service=service,
version="2025-10-30",
region=region,
host=host,
header=header,
scheme=scheme, # type: ignore
)
logger.debug(f"Invoke run code response: {res}")

try:
return res["Result"]["Result"]
except KeyError as e:
logger.error(f"Error occurred while running code: {e}, response is {res}")
return res
Loading