import sys
import time
import json
import signal
from types import SimpleNamespace
from typing import Dict, Any, Optional
from RestrictedPython import compile_restricted, safe_globals
import RestrictedPython.Guards
import io
import contextlib
import resource
class PythonExecutorSkill:
def __init__(self, timeout: int = 10, memory_limit_mb: int = 100):
self.timeout = timeout
self.memory_limit_bytes = memory_limit_mb * 1024 * 1024
def _set_limits(self):
"""设置资源限制(仅在 Unix-like 系统有效)"""
try:
resource.setrlimit(resource.RLIMIT_CPU, (self.timeout, self.timeout))
resource.setrlimit(resource.RLIMIT_AS, (self.memory_limit_bytes, self.memory_limit_bytes))
except (AttributeError, ValueError, OSError):
pass
def _validate_code(self, code: str) -> bool:
"""基础黑名单校验"""
dangerous_patterns = [
'import os', 'import sys', 'import subprocess', 'exec(', 'eval(',
'__import__', 'open(', 'file(', 'exit(', 'quit(', 'globals()', 'locals()'
]
for pattern in dangerous_patterns:
if pattern in code:
return False
return True
def execute_in_subprocess(self, code: str, context_vars: Dict[str, Any]) -> Dict[str, Any]:
"""在子进程中执行,提供更强隔离"""
import subprocess
import tempfile
import pickle
with tempfile.NamedTemporaryFile(delete=False, suffix='.pkl') as f:
pickle.dump(context_vars, f)
context_path = f.name
script = f"""
import sys
import pickle
import io
import traceback
import resource
import time
try:
resource.setrlimit(resource.RLIMIT_CPU, ({self.timeout}, {self.timeout}))
resource.setrlimit(resource.RLIMIT_AS, ({self.memory_limit_bytes}, {self.memory_limit_bytes}))
except:
pass
start_time = time.time()
with open('{context_path}', 'rb') as f:
context_vars = pickle.load(f)
old_stdout = sys.stdout
old_stderr = sys.stderr
captured_stdout = io.StringIO()
captured_stderr = io.StringIO()
sys.stdout = captured_stdout
sys.stderr = captured_stderr
result = None
error = None
try:
exec(compile(open(__file__.replace('.py', '_code.py'), 'r').read(), '<string>', 'exec'), {{}}, context_vars)
if '_result' in context_vars:
result = context_vars['_result']
except Exception as e:
error = traceback.format_exc()
sys.stdout = old_stdout
sys.stderr = old_stderr
output = {{
'status': 'error' if error else 'success',
'stdout': captured_stdout.getvalue(),
'stderr': captured_stderr.getvalue() or error,
'result': result,
'execution_time': time.time() - start_time
}}
print(json.dumps(output, default=str))
"""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as f:
f.write(script)
main_script = f.name
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_code.py') as f:
f.write(code)
code_script = f.name
try:
result = subprocess.run([sys.executable, main_script], capture_output=True, text=True, timeout=self.timeout + 2, cwd=tempfile.gettempdir())
if result.returncode == 0:
return json.loads(result.stdout)
else:
return {'status': 'error', 'stdout': '', 'stderr': result.stderr or 'Subprocess execution failed', 'result': None, 'execution_time': 0.0}
except subprocess.TimeoutExpired:
return {'status': 'error', 'stdout': '', 'stderr': 'Execution timed out', 'result': None, 'execution_time': float(self.timeout)}
finally:
import os
for path in [context_path, main_script, code_script]:
try:
os.unlink(path)
except:
pass
def execute(self, code: str, context_vars: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None) -> Dict[str, Any]:
if not self._validate_code(code):
return {'status': 'error', 'stdout': '', 'stderr': 'Code contains forbidden patterns', 'result': None, 'execution_time': 0.0}
effective_timeout = timeout or self.timeout
context_vars = context_vars or {}
return self.execute_in_subprocess(code, context_vars)
from langchain_core.tools import Tool
def create_python_executor_tool() -> Tool:
executor = PythonExecutorSkill(timeout=15, memory_limit_mb=128)
def run_code(input_str: str) -> str:
try:
input_data = json.loads(input_str)
code = input_data.get("code", "")
context_vars = input_data.get("context_vars", {})
timeout = input_data.get("timeout", 10)
result = executor.execute(code, context_vars, timeout)
output = []
if result['stdout']:
output.append(f"STDOUT:\n{result['stdout']}")
if result['stderr']:
output.append(f"STDERR:\n{result['stderr']}")
if result['result'] is not None:
output.append(f"RESULT:\n{result['result']}")
output.append(f"Execution time: {result['execution_time']:.2f}s")
return "\n".join(output)
except Exception as e:
return f"Tool execution error: {str(e)}"
return Tool(
name="python_executor",
description="Execute Python code safely in a sandboxed environment. Input must be a JSON string with 'code' field.",
func=run_code
)
import pandas as pd
data = {'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]}
df = pd.DataFrame(data)
executor = PythonExecutorSkill()
result = executor.execute(
code="print(df.describe())\n_result = df.mean().to_dict()",
context_vars={"df": df}
)
print("Result:", result)