Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions kosmos/agents/flight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import ast
import re
import time

import kosmos.utils as U
from langchain_openai import ChatOpenAI
from langchain.prompts import SystemMessagePromptTemplate
from langchain.prompts import AIMessage, HumanMessage, SystemMessage

from kosmos.prompts import load_prompt
from kosmos.control_primitives_context import load_control_primitives_context

class FlightAgent:
def __init__(
self,
model_name="gpt-4o",
temperature=0,
request_timeout=120,
checkpoint_dir="checkpoint",
chat_log=True,
execution_error=True,
):
self.checkpoint_dir = checkpoint_dir
self.chat_log = chat_log
self.execution_error = execution_error
U.f_mkdir(f"{checkpoint_dir}/action")
self.llm = ChatOpenAI(
model_name=model_name,
temperature=temperature,
request_timeout=request_timeout,
)

def construct_system_message(self, skills=[]):
system_template = load_prompt("action_template")
base_skills = [
# load control primitives
]
programs = "\n\n".join(load_control_primitives_context(base_skills) + skills)
response_format = load_prompt("action_response_format")
system_message_prompt = SystemMessagePromptTemplate.from_template(
system_template
)
system_message = system_message_prompt.format(
programs=programs, response_format=response_format
)
assert isinstance(system_message, SystemMessage)
return system_message

def construct_human_message(self, *, events, code="", task="", context="", audit=""):
chat_messages = []
error_messages = []
assert events[-1][0] == "observe", "Last event must be observe"
for i, (event_type, event) in enumerate(events):
# Process events here - placeholder for now
pass

observation = ""

if code:
observation += f"Code from the last round:\n{code}\n\n"
else:
observation += f"Code from the last round: No code in the first round\n\n"

if self.execution_error:
if error_messages:
error = "\n".join(error_messages)
observation += f"Execution error:\n{error}\n\n"
else:
observation += f"Execution error: No error\n\n"

if self.chat_log:
if chat_messages:
chat_log = "\n".join(chat_messages)
observation += f"Chat log: {chat_log}\n\n"
else:
observation += f"Chat log: None\n\n"

# FIXME: add all the game telemetry to the observation

observation += f"Task: {task}\n\n"

if context:
observation += f"Context: {context}\n\n"
else:
observation += f"Context: None\n\n"

if audit:
observation += f"Audit: {audit}\n\n"
else:
observation += f"Critique: None\n\n"

return HumanMessage(content=observation)

def process_ai_message(self, message):
assert isinstance(message, AIMessage)

retry = 3
error = None
while retry > 0:
try:
code_pattern = re.compile(r"```(?:python|py)(.*?)```", re.DOTALL)
code = "\n".join(code_pattern.findall(message.content))
parsed = ast.parse(code)
functions = []
assert len(parsed.body) > 0, "No functions found"

for node in parsed.body:
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue

node_type = "AsyncFunctionDef" if isinstance(node, ast.AsyncFunctionDef) else "FunctionDef"

# Extract function parameters
params = []
for arg in node.args.args:
params.append(arg.arg)

# Get function source code (simplified - in practice you might want to use ast.unparse)
functions.append({
"name": node.id.name,
"type": node_type,
"body": ast.unparse(node),
"params": params,
})

# Find the last async function
main_function = None
for function in reversed(functions):
if function["type"] == "AsyncFunctionDef":
main_function = function
break

assert (
main_function is not None
), "No async function found. Your main function must be async."
assert (
len(main_function["params"]) == 1
and main_function["params"][0] == "bot"
), f"Main function {main_function['name']} must take a single argument named 'bot'"

program_code = "\n\n".join(function["body"] for function in functions)
exec_code = f"await {main_function['name']}(bot)"

return {
"program_code": program_code,
"program_name": main_function["name"],
"exec_code": exec_code,
}
except Exception as e:
retry -= 1
error = e
time.sleep(1)
return f"Error parsing action response (before program execution): {error}"

def summarize_chatlog(self, events):
chatlog = set()
for event_type, event in events:
if event_type == "onChat":
chatlog.add(event["onChat"])
return "I also need " + ", ".join(chatlog) + "." if chatlog else ""
7 changes: 7 additions & 0 deletions kosmos/prompts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import importlib.util
import os
import kosmos.utils as U

def load_prompt(prompt_name):
package_path = importlib.util.find_spec("kosmos").submodule_search_locations[0]
return U.load_text(f"{package_path}/prompts/{prompt_name}.txt")
13 changes: 13 additions & 0 deletions kosmos/prompts/flight_response_format.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Explain: ...
Plan: ...
1) ...
2) ...
3) ...
...
Code:
```python
# helper functions (only if needed, try to avoid them)
...
# main function after the helper functions
def your_main_function_name(conn, vessel):
# ...
53 changes: 53 additions & 0 deletions kosmos/prompts/flight_template.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
You are a helpful assistant that writes kRPC and MechJeb python code to complete any Kerbel Space Program task specified by me.

Here are some useful programs written with the kRPC and MechJeb APIs.

{programs}


At each round of conversation, I will give you
Code from the last round: ...
Execution error: ...
Chat log: ...
Telemetry log: ...
Current body: ...
Mission time: ...
Vessel status: ...
Nearby vessels (nearest to farthest): ...
Fuel remaining: ...
Battery charge: ...
Position: ...
Velocity: ...
Altitude: ...
Orbit parameters: ...
Part status: ...
Resources: ...
Task: ...
Context: ...
Critique: ...

You should then respond to me with
Explain (if applicable): Are there any steps missing in your plan? Why does the code not complete the task? What does the chat log and execution error imply?
Plan: How to complete the task step by step. You should pay attention to to Resources and Part status since they tell what you have available. The task completeness check is also based on your final orbital state and mission objective.
Code:
1) Write a function taking the connection and vessel as arguments.
2) Reuse the above useful programs as much as possible.
- Use `launch_to_orbit(conn, vessel, target_altitude, target_inclination)` for launches. Do not use raw staging and throttle control.
- Use `execute_maneuver_node(conn, vessl, mode)` for burns. Do not use manual throttle control during burns.
- Use `automated_landing(conn, vessel, target_lat, target_lon)` for landings. Do not use manual landing procedures.
- Use `transfer_to_body(conn, vessel, target_body)` for interplanetary transfers. Do not calculate transfer windows manually.
- Use `rendezvous_with_target(conn, vessel, target)` for docking operations. Do not use manual approach procedures.
3) Your function will be reused for building more complex missions. Therefore, you should make it generic and reusable. You should not make strong assumptions about the vessel configuration (as it may be changed between missions), and therefore you should always check whether you have the required parts and resources before using them. If not, you should return appropriate error messages.
4) Functions in the "Code from the last round" section will not be saved or executed. Do not reuse functions listed there.
5) Anything defined outside a function will be ignored, define all your variables inside your functions.
6) Use `print()` statements to show intermediate progress and telemetry data.
7) Use `wait_for_soi_change(conn, vessel, target_body)` when waiting for sphere of influence changes during transfers. You should monitor vessel status during long burns or coast phases.
8) Always check `conn.space_center.active_vessel` to ensure you're controlling the correct vessel. Vessel references can change during scene transitions.
9) Do not write infinite loops without proper exit conditions based on mission objectives or failure states.
10) Do not use `conn.add_stream()` without properly removing streams when done. Always clean up resources.
11) Handle MechJeb availability with mj.available()` checks before using autopilots. Wait for MechJeb to initialize if needed.
12) Name your function in a meaningful way that describes the mission phase or objective.

You should only respond in the format as described below:
RESPONSE FORMAT:
{response_format}
Loading