-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoperators.py
More file actions
228 lines (194 loc) · 7.99 KB
/
operators.py
File metadata and controls
228 lines (194 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Blender operators for LLM Assistant
"""
import bpy
import asyncio
import threading
import traceback
from typing import Tuple
from bpy.props import StringProperty
from bpy.types import Operator
from .claude_client import ClaudeClient
from .agents import PlanningAgent, CodeGenerationAgent
# Global state for async operations
_execution_queue = []
_execution_lock = threading.Lock()
_current_status = {"status": "idle", "message": "", "error": None}
_pending_code = None # Code waiting to be executed on main thread
_current_clients = [] # Track active clients for cleanup
def get_preferences():
"""Get addon preferences"""
prefs = bpy.context.preferences.addons[__name__.split('.')[0]].preferences
return prefs
def update_status(status: str, message: str, error: str = None):
"""Update global status"""
global _current_status
with _execution_lock:
_current_status = {
"status": status,
"message": message,
"error": error
}
def get_status():
"""Get current status"""
global _current_status
with _execution_lock:
return _current_status.copy()
def execute_code_safely(code: str) -> Tuple[bool, str]:
"""Execute Blender Python code safely (must be called from main thread)"""
try:
# Create a safe execution environment
namespace = {
'__builtins__': __builtins__,
'bpy': bpy,
'mathutils': __import__('mathutils'),
}
# Execute code
exec(code, namespace)
return True, "Code executed successfully"
except Exception as e:
error_msg = f"Error: {str(e)}\n{traceback.format_exc()}"
return False, error_msg
def execute_pending_code():
"""Execute pending code on main thread (called by Blender timer)"""
global _pending_code
with _execution_lock:
if _pending_code is None:
return 0.05 # Check more frequently (0.05s = 50ms)
code = _pending_code
_pending_code = None
# Execute on main thread immediately
update_status("executing", "Executing generated code...")
success, result = execute_code_safely(code)
if success:
update_status("complete", f"Successfully created your model!", None)
else:
update_status("error", f"Code execution failed:\n{result}", result)
return None # Stop timer
async def process_prompt_async(prompt: str):
"""Async function to process user prompt through both agents"""
global _current_clients
client = None
fast_client = None
try:
update_status("planning", "Planning agent analyzing your request...")
# Get preferences
prefs = get_preferences()
if not prefs.api_key:
update_status("error", "Please set your Anthropic API key in addon preferences", "No API key configured")
return
# Initialize clients and agents
client = ClaudeClient(prefs.api_key, prefs.model)
with _execution_lock:
_current_clients.append(client)
planning_agent = PlanningAgent(client, use_fast_model=prefs.use_fast_planning)
code_agent = CodeGenerationAgent(client)
# Track fast client if created
if hasattr(planning_agent, 'fast_client') and planning_agent.fast_client != client:
fast_client = planning_agent.fast_client
with _execution_lock:
_current_clients.append(fast_client)
# Step 1: Planning (skip if option enabled)
if prefs.skip_planning:
update_status("generating", "Skipping planning for faster generation...")
plan = {
"understanding": prompt,
"steps": [{"step_number": 1, "description": prompt, "blender_operations": [], "details": ""}],
"complexity": "simple",
"estimated_operations": []
}
else:
update_status("planning", "Breaking down your request into steps...")
plan = await planning_agent.plan(prompt)
# Step 2: Code generation
update_status("generating", "Code generation agent creating Blender Python code...")
# Skip context gathering for speed - not critical for most requests
code = await code_agent.generate_code(plan, prompt, context=None)
# Step 3: Queue code for execution on main thread
global _pending_code
with _execution_lock:
_pending_code = code
update_status("executing", "Queued code for execution on main thread...")
except asyncio.CancelledError:
update_status("idle", "Generation cancelled")
except Exception as e:
error_msg = f"Error: {str(e)}\n{traceback.format_exc()}"
update_status("error", f"An error occurred: {str(e)}", error_msg)
finally:
# Clean up clients
with _execution_lock:
if client in _current_clients:
_current_clients.remove(client)
if fast_client and fast_client in _current_clients:
_current_clients.remove(fast_client)
# Close clients
if client:
try:
await client.close()
except:
pass
if fast_client:
try:
await fast_client.close()
except:
pass
def run_async_in_thread(coro):
"""Run async coroutine in a new thread"""
def run_in_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(coro)
finally:
loop.close()
thread = threading.Thread(target=run_in_thread, daemon=True)
thread.start()
return thread
class LLM_ASSISTANT_OT_Generate(Operator):
"""Generate Blender model from prompt"""
bl_idname = "llm_assistant.generate"
bl_label = "Generate from Prompt"
bl_description = "Use AI to generate Blender model from text prompt"
bl_options = {'REGISTER', 'UNDO'}
prompt: StringProperty(
name="Prompt",
description="Describe what you want to create in Blender",
default="",
)
def execute(self, context):
if not self.prompt:
self.report({'ERROR'}, "Please enter a prompt")
return {'CANCELLED'}
# Run async operation in thread
run_async_in_thread(process_prompt_async(self.prompt))
self.report({'INFO'}, "Processing your request... Check status panel")
return {'FINISHED'}
class LLM_ASSISTANT_OT_Stop(Operator):
"""Stop current generation"""
bl_idname = "llm_assistant.stop"
bl_label = "Stop Generation"
bl_description = "Stop the current AI generation process"
bl_options = {'REGISTER'}
def execute(self, context):
global _current_clients
# Cancel all active clients
with _execution_lock:
for client in _current_clients:
if hasattr(client, 'cancel'):
client.cancel()
_current_clients.clear()
update_status("idle", "Generation stopped by user")
self.report({'INFO'}, "Generation stopped")
return {'FINISHED'}
def register():
bpy.utils.register_class(LLM_ASSISTANT_OT_Generate)
bpy.utils.register_class(LLM_ASSISTANT_OT_Stop)
# Register timer to check for pending code execution - faster polling
if not bpy.app.timers.is_registered(execute_pending_code):
bpy.app.timers.register(execute_pending_code, first_interval=0.05)
def unregister():
# Unregister timer
if bpy.app.timers.is_registered(execute_pending_code):
bpy.app.timers.unregister(execute_pending_code)
bpy.utils.unregister_class(LLM_ASSISTANT_OT_Generate)
bpy.utils.unregister_class(LLM_ASSISTANT_OT_Stop)