-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
211 lines (177 loc) · 9.98 KB
/
app.py
File metadata and controls
211 lines (177 loc) · 9.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from flask import Flask, render_template, request, redirect, url_for, flash, session
import requests
import json
import sys
app = Flask(__name__)
app.secret_key = 'your_very_secret_key_for_sessions' # Important: Change this and keep it secret
# --- API URL Configuration (from your script) ---
BASE_CONVAI_URL = "https://api.elevenlabs.io/v1/convai/agents"
CREATE_AGENT_URL = f"{BASE_CONVAI_URL}/create"
# --- Helper Function for GET Requests (from your script, adapted slightly for web context) ---
def make_api_get_request(url, api_key, params=None, timeout=20):
headers = {"xi-api-key": api_key, "Content-Type": "application/json"}
try:
response = requests.get(url, headers=headers, params=params, timeout=timeout)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
error_message = f"HTTP error: {http_err}"
try:
error_details = http_err.response.json()
if isinstance(error_details, dict) and "detail" in error_details:
if isinstance(error_details["detail"], dict) and "message" in error_details["detail"]:
error_message += f" - Server message: {error_details['detail']['message']}"
elif isinstance(error_details["detail"], str):
error_message += f" - Server message: {error_details['detail']}"
except json.JSONDecodeError:
error_message += f" - Raw response: {http_err.response.text}"
flash(error_message, 'error')
except requests.exceptions.RequestException as req_err:
flash(f"Request error: {req_err}", 'error')
except Exception as e:
flash(f"An unexpected error occurred: {e}", 'error')
return None
# --- Functions from your script (adapted) ---
def list_agents_for_selection(api_key):
data = make_api_get_request(BASE_CONVAI_URL, api_key)
if data and "agents" in data:
return data["agents"]
return []
def get_agent_details_from_source(agent_id, api_key):
agent_url = f"{BASE_CONVAI_URL}/{agent_id}"
return make_api_get_request(agent_url, api_key)
def get_existing_agent_names(api_key):
data = make_api_get_request(BASE_CONVAI_URL, api_key)
agent_names = set()
if data and "agents" in data and data["agents"]:
for agent in data["agents"]:
if agent.get("name"):
agent_names.add(agent.get("name"))
return agent_names
def create_agent_in_target_logic(original_agent_data, target_api_key):
# This is the core logic from your create_agent_in_target function
# It will return a tuple: (success_boolean, message_or_new_agent_id)
if not original_agent_data:
return False, "Error: No original agent data provided."
original_name = original_agent_data.get("name", "Unnamed Agent")
target_agent_names = get_existing_agent_names(target_api_key)
base_new_name = f"{original_name} - Copy"
current_new_name = base_new_name
copy_counter = 1
while current_new_name in target_agent_names:
copy_counter += 1
current_new_name = f"{base_new_name} {copy_counter}"
new_name = current_new_name
status_messages = []
if new_name != base_new_name:
status_messages.append(f"Name '{base_new_name}' already exists in target. Using unique name: '{new_name}'")
else:
status_messages.append(f"Using name for new agent: '{new_name}'")
payload = {}
if "conversation_config" not in original_agent_data:
return False, f"Skipping '{original_name}': 'conversation_config' not found."
conversation_config_copy = json.loads(json.dumps(original_agent_data["conversation_config"]))
if 'tts' in conversation_config_copy and isinstance(conversation_config_copy.get('tts'), dict) and \
'model_id' in conversation_config_copy['tts']:
original_tts_model = conversation_config_copy['tts']['model_id']
allowed_tts_models = ['eleven_turbo_v2', 'eleven_turbo_v2_5', 'eleven_flash_v2', 'eleven_flash_v2_5']
if original_tts_model not in allowed_tts_models:
status_messages.append(f"Warning: Original TTS model_id '{original_tts_model}' replaced with 'eleven_turbo_v2'.")
conversation_config_copy['tts']['model_id'] = 'eleven_turbo_v2'
kb_cleared = False
if 'agent' in conversation_config_copy and isinstance(conversation_config_copy.get('agent'), dict) and \
'prompt' in conversation_config_copy['agent'] and isinstance(conversation_config_copy['agent'].get('prompt'), dict) and \
'knowledge_base' in conversation_config_copy['agent']['prompt'] and \
isinstance(conversation_config_copy['agent']['prompt'].get('knowledge_base'), list):
if conversation_config_copy['agent']['prompt']['knowledge_base']:
status_messages.append(f"Info: Cleared {len(conversation_config_copy['agent']['prompt']['knowledge_base'])} KB references from agent.prompt.knowledge_base.")
kb_cleared = True
conversation_config_copy['agent']['prompt']['knowledge_base'] = []
if kb_cleared:
status_messages.append("KB references cleared. Re-link manually in target account.")
payload["conversation_config"] = conversation_config_copy
payload["name"] = new_name
if "platform_settings" in original_agent_data:
platform_settings_copy = json.loads(json.dumps(original_agent_data["platform_settings"]))
if "auth" in platform_settings_copy and isinstance(platform_settings_copy.get('auth'), dict) and \
"shareable_token" in platform_settings_copy["auth"]:
del platform_settings_copy["auth"]["shareable_token"]
payload["platform_settings"] = platform_settings_copy
if "tags" in original_agent_data:
payload["tags"] = original_agent_data["tags"]
target_headers = {"xi-api-key": target_api_key, "Content-Type": "application/json"}
try:
response = requests.post(CREATE_AGENT_URL, headers=target_headers, json=payload, timeout=30)
response.raise_for_status()
new_agent_details = response.json()
success_msg = f"SUCCESS: Agent '{new_name}' created with ID: {new_agent_details.get('agent_id')}"
status_messages.append(success_msg)
return True, "\n".join(status_messages)
except requests.exceptions.HTTPError as http_err:
error_message = f"HTTP error creating agent '{new_name}': {http_err}"
# ... (similar detailed error parsing as in make_api_get_request) ...
status_messages.append(error_message)
except Exception as e:
status_messages.append(f"Unexpected error: {e}")
return False, "\n".join(status_messages)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
source_api_key = request.form.get('source_api_key')
if not source_api_key:
flash('Source API key is required.', 'error')
return redirect(url_for('index'))
session['source_api_key'] = source_api_key # Store in session
agents = list_agents_for_selection(source_api_key)
if agents is None: # list_agents_for_selection now flashes errors
agents = [] # Ensure agents is iterable for the template
# Store fetched agents in session to avoid re-fetching if form validation fails later
session['source_agents_summary'] = [{'id': ag.get('agent_id'), 'name': ag.get('name')} for ag in agents]
return render_template('index.html', agents=agents, source_key_provided=True)
# GET request or initial load
agents = []
source_key_provided = 'source_api_key' in session
if source_key_provided and 'source_agents_summary' in session:
# If returning to page and agents were already fetched (e.g. after a failed copy attempt)
# Re-populate from session to avoid re-listing unless key changes
# This part needs more robust logic for a real app, but for simplicity:
temp_agents = session.get('source_agents_summary', [])
# For the template, we just need id and name
agents_for_display = [{'agent_id': ag['id'], 'name': ag['name']} for ag in temp_agents]
return render_template('index.html', agents=agents_for_display, source_key_provided=source_key_provided)
return render_template('index.html', agents=agents, source_key_provided=False)
@app.route('/copy', methods=['POST'])
def copy_agent_route():
source_api_key = session.get('source_api_key')
selected_agent_id = request.form.get('selected_agent_id')
target_api_key = request.form.get('target_api_key')
if not source_api_key:
flash('Source API key session expired or not provided. Please start over.', 'error')
session.pop('source_agents_summary', None) # Clear previous agent list
return redirect(url_for('index'))
if not selected_agent_id:
flash('No agent selected for copying.', 'error')
return redirect(url_for('index')) # Redirect back, agents should still be listed if session is good
if not target_api_key:
flash('Target API key is required.', 'error')
return redirect(url_for('index')) # Redirect back
agent_details = get_agent_details_from_source(selected_agent_id, source_api_key)
if not agent_details:
flash(f"Failed to fetch details for agent ID {selected_agent_id}.", 'error')
return redirect(url_for('index'))
if target_api_key == source_api_key:
# For a web app, a checkbox "Copy within same account?" might be better than a confirm prompt
flash("Copying within the same account.", 'info')
success, message = create_agent_in_target_logic(agent_details, target_api_key)
if success:
flash(message, 'success')
else:
flash(message, 'error')
# Clear source key and agent list after copy attempt to force re-entry for new operation
session.pop('source_api_key', None)
session.pop('source_agents_summary', None)
return redirect(url_for('index'))
if __name__ == '__main__':
# This is for local development only
# For Cloudflare, you won't run it this way
app.run(debug=True)