-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathopensearch_setup.py
More file actions
420 lines (386 loc) · 15.2 KB
/
Copy pathopensearch_setup.py
File metadata and controls
420 lines (386 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
"""
Setup script for OpenSearch ML Commons models, pipelines, and connectors.
Converts curl commands to opensearch-py client calls.
"""
import os
import time
import json
from opensearchpy import OpenSearch
from opensearchpy.exceptions import RequestError
def wait_for_task(client, task_id, max_wait=300, poll_interval=2):
"""Wait for a task to complete."""
print(f"Waiting for task {task_id} to complete...")
start_time = time.time()
while time.time() - start_time < max_wait:
try:
response = client.transport.perform_request(
'GET',
f'/_plugins/_ml/tasks/{task_id}'
)
state = response.get('state', '')
if state == 'COMPLETED':
print(f"Task {task_id} completed successfully")
return response
elif state == 'FAILED':
error = response.get('error', 'Unknown error')
raise Exception(f"Task {task_id} failed: {error}")
time.sleep(poll_interval)
except Exception as e:
if 'not found' in str(e).lower():
# Task might not be immediately available
time.sleep(poll_interval)
continue
raise
raise Exception(f"Task {task_id} did not complete within {max_wait} seconds")
def setup_opensearch():
"""Set up OpenSearch cluster with ML models, pipelines, and connectors."""
# Initialize OpenSearch client
client = OpenSearch(
hosts=[{
'host': os.getenv('OPENSEARCH_HOST', 'localhost'),
'port': int(os.getenv('OPENSEARCH_PORT', 9200))
}],
use_ssl=False,
verify_certs=False
)
# Store IDs and configuration
config = {}
print("=" * 60)
print("Starting OpenSearch Setup")
print("=" * 60)
# Step 1: Configure Cluster Settings
print("\n1. Configuring cluster settings...")
try:
cluster_settings = {
"persistent": {
"plugins.ml_commons.allow_registering_model_via_url": "true",
"plugins.ml_commons.only_run_on_ml_node": "false",
"plugins.ml_commons.model_access_control_enabled": "true",
"plugins.ml_commons.native_memory_threshold": "99"
}
}
response = client.cluster.put_settings(body=cluster_settings)
print("✓ Cluster settings configured")
except Exception as e:
print(f"✗ Error configuring cluster settings: {e}")
raise
# Step 2: Register Model Group
print("\n2. Registering model group...")
try:
model_group_body = {
"name": "note_search_with_highlighter",
"description": "Models for note search with highlighter"
}
response = client.transport.perform_request(
'POST',
'/_plugins/_ml/model_groups/_register',
body=json.dumps(model_group_body),
headers={'Content-Type': 'application/json'}
)
model_group_id = response.get('model_group_id')
config['MODEL_GROUP_ID'] = model_group_id
print(f"✓ Model group registered: {model_group_id}")
except Exception as e:
print(f"✗ Error registering model group: {e}")
raise
# Step 3: Register Embedding Model
print("\n3. Registering embedding model...")
try:
embedding_model_body = {
"name": "huggingface/sentence-transformers/all-MiniLM-L6-v2",
"version": "1.0.1",
"model_format": "TORCH_SCRIPT",
"model_group_id": model_group_id
}
response = client.transport.perform_request(
'POST',
'/_plugins/_ml/models/_register',
body=json.dumps(embedding_model_body),
headers={'Content-Type': 'application/json'}
)
embedding_task_id = response.get('task_id')
print(f"✓ Embedding model registration started: {embedding_task_id}")
# Wait for model registration to complete
task_response = wait_for_task(client, embedding_task_id)
embedding_model_id = task_response.get('model_id')
config['EMBEDDING_MODEL_ID'] = embedding_model_id
print(f"✓ Embedding model registered: {embedding_model_id}")
except Exception as e:
print(f"✗ Error registering embedding model: {e}")
raise
# Step 4: Create Ingest Pipeline
print("\n4. Creating ingest pipeline...")
try:
pipeline_body = {
"description": "Generate embeddings for note content",
"processors": [
{
"text_embedding": {
"model_id": embedding_model_id,
"field_map": {
"content": "content_embedding"
}
}
}
]
}
client.ingest.put_pipeline(id='note-embedding-pipeline', body=pipeline_body)
config['INGEST_PIPELINE'] = 'note-embedding-pipeline'
print("✓ Ingest pipeline created: note-embedding-pipeline")
except Exception as e:
print(f"✗ Error creating ingest pipeline: {e}")
raise
# Step 5: Create Notes Index
print("\n5. Creating notes index...")
try:
index_body = {
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"default_pipeline": "note-embedding-pipeline",
"knn": True
},
"analysis": {
"analyzer": {
"default": {
"type": "standard"
}
}
}
},
"mappings": {
"properties": {
"note_id": {
"type": "integer"
},
"title": {
"type": "text",
"analyzer": "standard"
},
"content": {
"type": "text",
"analyzer": "standard"
},
"text_to_embed": {
"type": "text"
},
"embedding": {
"type": "knn_vector",
"dimension": 384,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "lucene",
"parameters": {
"ef_construction": 128,
"m": 24
}
}
},
"category": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"user_id": {
"type": "integer"
},
"created_at": {
"type": "date"
},
"color": {
"type": "keyword"
}
}
}
}
# Delete index if it exists
if client.indices.exists(index='notes'):
client.indices.delete(index='notes')
print(" Deleted existing notes index")
client.indices.create(index='notes', body=index_body)
config['INDEX_NAME'] = 'notes'
print("✓ Notes index created")
except RequestError as e:
if 'resource_already_exists_exception' in str(e):
print(" Index already exists, skipping...")
else:
print(f"✗ Error creating notes index: {e}")
raise
except Exception as e:
print(f"✗ Error creating notes index: {e}")
raise
# Step 6: Register and Deploy Highlighting Model
print("\n6. Registering highlighting model...")
try:
highlighting_model_body = {
"name": "amazon/sentence-highlighting/opensearch-semantic-highlighter-v1",
"version": "1.0.0",
"model_format": "TORCH_SCRIPT",
"function_name": "QUESTION_ANSWERING"
}
response = client.transport.perform_request(
'POST',
'/_plugins/_ml/models/_register?deploy=true',
body=json.dumps(highlighting_model_body),
headers={'Content-Type': 'application/json'}
)
highlighting_task_id = response.get('task_id')
print(f"✓ Highlighting model registration started: {highlighting_task_id}")
# Wait for model registration and deployment
task_response = wait_for_task(client, highlighting_task_id)
highlighting_model_id = task_response.get('model_id')
config['HIGHLIGHTING_MODEL_ID'] = highlighting_model_id
print(f"✓ Highlighting model registered and deployed: {highlighting_model_id}")
except Exception as e:
print(f"✗ Error registering highlighting model: {e}")
raise
# Step 7: Create Hybrid RRF Search Pipeline
print("\n7. Creating hybrid RRF search pipeline...")
try:
rrf_pipeline_body = {
"description": "Post processor for hybrid RRF search",
"phase_results_processors": [
{
"score-ranker-processor": {
"combination": {
"technique": "rrf"
}
}
}
]
}
client.transport.perform_request(
'PUT',
'/_search/pipeline/hybrid-rrf-pipeline',
body=json.dumps(rrf_pipeline_body),
headers={'Content-Type': 'application/json'}
)
config['RRF_PIPELINE'] = 'hybrid-rrf-pipeline'
print("✓ Hybrid RRF search pipeline created")
except Exception as e:
print(f"✗ Error creating RRF pipeline: {e}")
raise
# Step 8: Create DeepSeek Chat Connector
print("\n8. Creating DeepSeek Chat connector...")
try:
connector_body = {
"name": "DeepSeek Chat",
"description": "Test connector for DeepSeek Chat",
"version": "1",
"protocol": "http",
"parameters": {
"endpoint": "api.deepseek.com",
"model": "deepseek-chat"
},
"credential": {
"deepSeek_key": os.getenv('DEEPSEEK_API_KEY', '')
},
"actions": [
{
"action_type": "predict",
"method": "POST",
"url": "https://${parameters.endpoint}/v1/chat/completions",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${credential.deepSeek_key}"
},
"request_body": "{ \"model\": \"${parameters.model}\", \"messages\": ${parameters.messages} }"
}
]
}
response = client.transport.perform_request(
'POST',
'/_plugins/_ml/connectors/_create',
body=json.dumps(connector_body),
headers={'Content-Type': 'application/json'}
)
connector_id = response.get('connector_id')
config['CONNECTOR_ID'] = connector_id
print(f"✓ DeepSeek Chat connector created: {connector_id}")
except Exception as e:
print(f"✗ Error creating connector: {e}")
raise
# Step 9: Register Remote Model (DeepSeek Chat)
print("\n9. Registering remote DeepSeek Chat model...")
try:
remote_model_body = {
"name": "DeepSeek Chat model",
"function_name": "remote",
"description": "DeepSeek Chat",
"model_group_id": model_group_id,
"connector_id": connector_id
}
response = client.transport.perform_request(
'POST',
'/_plugins/_ml/models/_register?deploy=true',
body=json.dumps(remote_model_body),
headers={'Content-Type': 'application/json'}
)
remote_task_id = response.get('task_id')
print(f"✓ Remote model registration started: {remote_task_id}")
# Wait for remote model registration
task_response = wait_for_task(client, remote_task_id)
remote_model_id = task_response.get('model_id')
config['REMOTE_MODEL_ID'] = remote_model_id
print(f"✓ Remote model registered and deployed: {remote_model_id}")
except Exception as e:
print(f"✗ Error registering remote model: {e}")
raise
# Step 10: Create RAG Search Pipeline
print("\n10. Creating RAG search pipeline...")
try:
rag_pipeline_body = {
"response_processors": [
{
"retrieval_augmented_generation": {
"tag": "Demo pipeline",
"description": "Demo pipeline Using DeepSeek Chat",
"model_id": remote_model_id,
"context_field_list": ["title", "content", "category", "tags"],
"system_prompt": "You are a helpful assistant that helps users explore and understand their personal notes. Use the provided context from the user's notes to answer questions accurately and helpfully. Reference specific notes by title when relevant. If the context doesn't contain enough information, acknowledge this politely. Be conversational and helpful.",
"user_instructions": "Answer based on these notes from my collection"
}
}
]
}
client.transport.perform_request(
'PUT',
'/_search/pipeline/my-conversation-search-pipeline-deepseek-chat',
body=json.dumps(rag_pipeline_body),
headers={'Content-Type': 'application/json'}
)
config['CHATBOT_PIPELINE'] = 'my-conversation-search-pipeline-deepseek-chat'
print("✓ RAG search pipeline created")
except Exception as e:
print(f"✗ Error creating RAG pipeline: {e}")
raise
print("\n" + "=" * 60)
print("OpenSearch Setup Complete!")
print("=" * 60)
# Return environment variables
env_vars = {
'OPENSEARCH_HOST': os.getenv('OPENSEARCH_HOST', 'localhost'),
'OPENSEARCH_PORT': os.getenv('OPENSEARCH_PORT', '9200'),
**config
}
print("\nEnvironment Variables:")
print("-" * 60)
for key, value in env_vars.items():
print(f"{key}={value}")
print("-" * 60)
return env_vars
if __name__ == '__main__':
try:
env_vars = setup_opensearch()
print("\n✓ Setup completed successfully!")
print("\nYou can now use these environment variables in your .env file:")
for key, value in env_vars.items():
print(f"{key}={value}")
except Exception as e:
print(f"\n✗ Setup failed: {e}")
import traceback
traceback.print_exc()
exit(1)