-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathrun_python_code.py
More file actions
118 lines (95 loc) · 4.17 KB
/
run_python_code.py
File metadata and controls
118 lines (95 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import asyncio
import docker
from unidecode import unidecode
import os
from logs import Log
from utils import convert_username
description = """This addon allows you to execute python code in a non persistant terminal, When opening files be sure to open from /data/filename.
Always include print statements to track the progress or path and name(s) of generated files.
Save any generated files in the /data/ directory with the format /data/filename.ext.
You must always display media that's been saved in the /data/ directory, using the markdown format [description](data/filename.ext) or html tags for video's (without triple quotes).
When asked to create videos, follow these rules: Create a video (make sure the code works), convert to h264 using ffmpeg subprocess, include in response as HTML tag, avoid triple quotes around HTML tag, images should always immediately be shown in your response in markdown format."""
parameters = {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The python code to be executed, Include prints to see the status and output, especially the path and name of generated files.",
},
"pip_packages": {
"type": "array",
"items": {"type": "string"},
"description": "The list of pip packages to be installed before executing the code.",
},
},
"required": ["content"],
}
async def run_python_code(content, pip_packages=[], previous_content="", username=None):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, sync_run_python_code, content, pip_packages, previous_content, username
)
def sync_run_python_code(content, pip_packages=[], previous_content="", username=None):
# print(
# f"trying to run python code: {content}\n\nwith pip packages: {pip_packages}\n\nfor user: {username}"
# )
# convert the email to a name without special characters to name our container
if username:
username = convert_username(username)
if previous_content != "":
new_content = previous_content + "\n" + content
else:
new_content = content
new_content = escape_string(new_content)
try:
client = docker.from_env()
# Get the environment variable value
charlie_mnemonic_user_dir = os.environ.get(
"CHARLIE_MNEMONIC_USER_DIR"
) or os.path.join(os.getcwd(), "users")
full_path = os.path.join(charlie_mnemonic_user_dir, username, "data")
# Define the path inside the container
container_path = "/data"
# Define the volume
volumes = {full_path: {"bind": container_path, "mode": "rw"}}
# Remove existing container with the same name
for c in client.containers.list(all=True):
if c.name == username:
c.remove(force=True)
# Start a new container with the volume
container = client.containers.run(
"goodaidev/charlie-mnemonic-python-env",
name=username,
detach=True,
tty=True,
volumes=volumes,
)
pip_string = ""
# Install pip packages in the container
for package in pip_packages:
pip_install_result = container.exec_run(f"pip install {package}")
if pip_install_result.exit_code != 0:
return {
"error": "Failed to install package: "
+ pip_install_result.output.decode("utf-8")
}
else:
pip_string += f"successfully installed package: {package}\n"
# Execute the code in the container
exec_result = container.exec_run(f'python3 -c "{new_content}"')
output = exec_result.output.decode("utf-8")[:2000]
# Prepare the response
response = {
"pip": pip_string,
"output": output,
"exit_code": exec_result.exit_code,
}
# Remove the container
container.remove(force=True)
return response
except Exception as e:
return {"error": str(e)}
def escape_string(s):
s = s.replace("\\", "\\\\")
s = s.replace('"', '\\"')
return s