Skip to content

feat(knowledge): add OpenSearch as a first-class knowledge backend#228

Open
rhossi wants to merge 34 commits into
NVIDIA-AI-Blueprints:developfrom
rhossi:feat/opensearch-aoss
Open

feat(knowledge): add OpenSearch as a first-class knowledge backend#228
rhossi wants to merge 34 commits into
NVIDIA-AI-Blueprints:developfrom
rhossi:feat/opensearch-aoss

Conversation

@rhossi
Copy link
Copy Markdown

@rhossi rhossi commented May 12, 2026

Summary

Adds OpenSearch as a built-in knowledge backend in AIQ 2.0, alongside the
existing LlamaIndex (ChromaDB) and Foundational RAG (Milvus) options.
Selectable through the standard backend: opensearch field in workflow YAML;
no fork, no custom base image.

OpenSearch is broadly available — self-hosted, and as a managed offering on
every major cloud (Amazon OpenSearch Service, Amazon OpenSearch Serverless,
others). Until now, customers running OpenSearch had to write their own
BaseRetriever/BaseIngestor adapter, rebuild base images to include it,
and handle their environment's authentication themselves — including inside
the Dask workers during ingestion. This PR removes that burden and
establishes a plugin pattern that other native backends can follow.

What is included

  • OpenSearchRetriever + OpenSearchIngestor extending BaseRetriever /
    BaseIngestor, registered through the knowledge-layer factory, exposed as
    a NAT function (backend: opensearch in YAML).
  • Three auth modes: none (development clusters), basic (self-hosted
    with username/password), sigv4 (Amazon OpenSearch Service via
    aws_service: es, Amazon OpenSearch Serverless via aws_service: aoss).
    Pluggable enough for additional auth schemes to slot in as new Literal
    values without adapter rewrites.
  • Authentication inside the Dask worker process: the ingestion task
    constructs its own OpenSearch client inside each worker, so credentials
    resolve from the worker's environment — works with EKS Pod Identity,
    IRSA, SSO, env profiles, and instance profiles. No signer state is
    serialized across the cluster.
  • Session isolation: AIQ collections map to per-session OpenSearch
    indexes (<prefix>-s_<uuid>) inside a single persistent OpenSearch
    cluster, with 24-hour TTL cleanup driven by _meta.updated_at. Lets
    multiple sessions share one OpenSearch resource without cross-session
    leakage.
  • Helm chart override surface: example values at
    deploy/helm/examples/aws-opensearch-serverless-values.yaml shows the
    image, imagePullSecrets, and secretEnv shape for a managed-serverless
    EKS deployment. The chart's existing schema accepts the override directly —
    no chart fork required to point at a custom image or environment.
  • Reference workflow YAML: configs/config_web_opensearch.yml
    (env-substitution driven; the same file works for self-hosted, managed,
    and serverless OpenSearch backends).
  • Reference deployment guide:
    docs/source/deployment/aws-opensearch-serverless.md — end-to-end
    walkthrough for one concrete cloud (Amazon OpenSearch Serverless on EKS
    with Pod Identity), covering architecture, prerequisites, OpenSearch
    collection setup, IAM trust policy, data access policy, Pod Identity
    association, image pull secret, embedding endpoint, verification, and
    cleanup. The same pattern adapts to other cloud or self-hosted OpenSearch
    deployments.

Test plan

All paths validated against a real Amazon OpenSearch Serverless collection
(SigV4 via assumed role) prior to PR submission:

  • Adapter unit tests (tests/knowledge_layer_tests/test_opensearch_adapter.py) — 19 passed.
  • AOSS live test suite (tests/knowledge_layer_tests/test_opensearch_serverless_live.py) —
    2 passed in 160s. Covers SigV4 health, collection lifecycle, vector
    ingest, k-NN retrieval, filtered k-NN, and AOSS-aware delete batching.
  • Reference YAML loads cleanly under nat serve against a live
    OpenSearch endpoint.
  • Local-mode ingestion: POST /v1/collections/<name>/documents → embed
    → bulk index → status completed. Verified via direct doc-count
    query.
  • Dask-mode ingestion: separate scheduler + worker; _bulk issued from
    the worker process (verified absent from backend log). Authentication
    resolved inside the worker.
  • File deletion: search-then-bulk-delete pattern with eventual visibility
    wait. Doc count drops to 0 in the target index; other indexes
    unaffected.
  • Real PDF ingestion: 30-page paper → 30 chunks via pypdf → multi-batch
    embedding → bulk index in 5.3s. Page-level citations preserved through
    retrieval.
  • UI flow with session-bound collection: <prefix>-s_<uuid> index
    appears in OpenSearch alongside API-driven collections.
  • Sphinx docs build under strict mode (SPHINXOPTS="-W --keep-going -n")
    — zero warnings.
  • Reference YAML and helm example values both parse cleanly.

CONTRIBUTING.md compliance

  • All 26 commits carry Signed-off-by: trailers per the DCO requirement
    (git rebase --signoff across the branch).
  • Manual testing complete per §4 (no CI/CD in this repo today).

rhossi and others added 26 commits May 6, 2026 09:58
…AOSS support

Ships OpenSearchRetriever and OpenSearchIngestor as built-in BaseRetriever and
BaseIngestor implementations selectable via knowledge_retrieval YAML
(backend: opensearch). Supports auth modes none, basic, and sigv4 with the
aoss/es service split, AOSS-aware defaults (allow_document_ids, bulk_refresh),
HNSW knn_vector mappings, TTL-driven session collection cleanup, and Dask
ingestion that constructs the OpenSearch client inside each worker so SigV4
credentials resolve in the worker process (Pod Identity, SSO, env profiles).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Unit tests cover index lifecycle, ingestion (local and Dask paths), retrieval,
deletion, and TTL cleanup. Two opt-in live suites validate against a real
OpenSearch endpoint: a generic suite for self-hosted/basic/sigv4 setups and a
dedicated AOSS suite that exercises SigV4 health checks, eventual visibility,
and aoss-specific delete batching. Adapter compliance harness extended to
include the opensearch backend.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Reference YAML wires the chat_deepresearcher_agent stack to the OpenSearch
knowledge backend with env-substitution defaults so the same file works for
self-hosted clusters and Amazon OpenSearch Serverless without edits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Adds OpenSearch to the available backends matrix, switching examples for
self-hosted (none/basic) and AOSS (sigv4 with aoss/es), Dask ingestion mode
guidance, three live-test recipes, and OpenSearch-specific environment
variables.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Initial Amazon OpenSearch Serverless deployment guide covering workflow
config, EKS Pod Identity association command, helm values override, local
SSO live-test recipe, and a troubleshooting table. Helm README adds an
OpenSearch section with image override and SigV4 env wiring. The example
helm values file provides a starting point for AOSS deployments. Follow-up
plan deepens this guide into an end-to-end EKS walkthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Twelve-task plan that turns the initial AOSS deployment guide into a
self-contained end-to-end EKS walkthrough: migration callout, architecture
diagram, prerequisites, AOSS collection creation, IAM trust and permissions,
data access policy, Pod Identity association, image pull secret, embedding
endpoint, verification, teardown, cross-link, and strict docs build.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
…on creation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
…missions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
…olicy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
…guide

Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
… install

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Add aws-opensearch-serverless.md to the Deployment toctree in index.md so
the strict Sphinx build (-W -n) passes without the toc.not_included warning.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
NVIDIA NIM hosted asymmetric embedding models (default
nvidia/llama-nemotron-embed-vl-1b-v2) reject /v1/embeddings calls without
input_type. The adapter went direct-to-OpenAI-SDK and hit a 400 on every
ingestion. Pass input_type=passage from OpenSearchIngestor and input_type=query
from OpenSearchRetriever via extra_body so the OpenAI client forwards the
parameter to NIM.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
…dropped on /v1/chat/completions

The nat framework only reads Context.conversation_id from the
conversation-id HTTP request header; the ChatRequest Pydantic model does
not declare a conversation_id field and any value supplied in the JSON
body is discarded because the model uses extra="allow". Document the
correct workaround (pass the header) and note that the permanent fix
belongs upstream in the nat/aiq_api repository.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Eight-task plan executed alongside live AOSS validation: fail-fast on
missing NVIDIA_API_KEY for the hosted embeddings API, text-only ingestion
callout, health-endpoint response correction, AOSS visibility-delay note,
conversation_id triage (upstream nat package — documented workaround via
conversation-id header), Dask-worker logging gotcha, DCO sign-off rebase,
and a strict docs build quality gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 12, 2026

Greptile Summary

This PR adds OpenSearch as a first-class knowledge backend alongside the existing LlamaIndex and Foundational RAG options, implementing OpenSearchRetriever and OpenSearchIngestor with three auth modes (none, basic, SigV4), Dask-based distributed ingestion, composite-pagination file listing, and a TTL cleanup mechanism for session-bound indexes.

  • Core adapter (sources/knowledge_layer/src/opensearch/adapter.py): 1 553-line implementation covering index lifecycle, chunking, embedding, bulk indexing, AOSS-aware delete, and composite-aggregation-based file listing. All five issues flagged in the previous review round are resolved.
  • Distributed worker (distributed.py): Dask worker entry point that reconstructs its own OpenSearchIngestor inside the worker so SigV4 credentials resolve from the worker environment.
  • Config & registration (register.py, configs/config_web_opensearch.yml, deploy/helm/examples/): Extends KnowledgeRetrievalConfig with ~25 new OpenSearch fields backed by env vars, adds the backend to the factory, and ships a reference YAML and Helm values example.

Confidence Score: 5/5

Safe to merge. All five issues raised in the previous review round are resolved and the new code introduces no correctness regressions or data-safety concerns.

The adapter core paths are implemented correctly and backed by a 1100+ line unit test suite. Remaining findings are efficiency and consistency suggestions with no impact on correctness.

The _collection_info_from_index method in adapter.py and validate_backend_config in register.py are worth a second look before the feature sees heavy multi-tenant load.

Important Files Changed

Filename Overview
sources/knowledge_layer/src/opensearch/adapter.py Core 1553-line adapter. All previously-reviewed P1 issues are resolved. N+1 calls in list_collections and per-call OpenAI client allocation are the remaining observations.
sources/knowledge_layer/src/opensearch/distributed.py Dask worker entry point. Correctly reconstructs its own ingestor so credentials resolve locally; temp files cleaned up in finally.
sources/knowledge_layer/src/register.py Adds ~25 OpenSearch fields to the shared KnowledgeRetrievalConfig. Missing cross-backend warnings for embed_model/embed_base_url.
configs/config_web_opensearch.yml Reference YAML using env-substitution for all sensitive values. Clean and portable.
tests/knowledge_layer_tests/test_opensearch_adapter.py Comprehensive 1132-line unit test suite with fake OpenSearch clients covering all major code paths.

Sequence Diagram

sequenceDiagram
    participant Client as API Client
    participant Ingestor as OpenSearchIngestor
    participant Dask as Dask Worker
    participant OS as OpenSearch / AOSS

    Client->>Ingestor: submit_job(file_paths, collection)
    Ingestor->>Ingestor: _ensure_index (race-safe)
    alt Local mode
        Ingestor->>Ingestor: _start_local_ingestion (thread)
        Ingestor-->>Client: job_id (immediate)
        Ingestor->>OS: bulk index chunks
    else Dask mode
        Ingestor->>Dask: client.submit(run_opensearch_ingestion_task)
        Ingestor-->>Client: job_id (immediate)
        Ingestor->>Ingestor: _monitor_dask_ingestion (thread)
        Dask->>Dask: reconstruct OpenSearchIngestor
        Dask->>OS: bulk index chunks (SigV4 resolved in worker)
        Dask-->>Ingestor: result dict
        Ingestor->>Ingestor: _apply_dask_ingestion_result
    end

    Client->>Ingestor: retrieve(query, collection)
    Ingestor->>OS: knn_vector search + filter
    OS-->>Ingestor: hits
    Ingestor-->>Client: RetrievalResult (Chunks)

    loop TTL cleanup (every 1h)
        Ingestor->>OS: list_collections
        OS-->>Ingestor: collections + updated_at
        Ingestor->>OS: delete_collection (expired indexes)
    end
Loading

Reviews (8): Last reviewed commit: "fix(opensearch): stop indexing internal ..." | Re-trigger Greptile

Comment thread sources/knowledge_layer/src/opensearch/adapter.py
Comment thread sources/knowledge_layer/src/opensearch/adapter.py
Comment thread sources/knowledge_layer/src/opensearch/adapter.py
…e races

Two concurrent ingestion jobs targeting the same uncreated collection could
both pass the pre-create exists() check, both call indices.create(), and the
loser would raise resource_already_exists_exception (HTTP 400) which
propagated through _run_ingestion's outer except Exception and marked the
losing job as FAILED.

Wrap the create() in try/except and re-check exists() on failure. If the
index is now present, the other worker won and we proceed. Otherwise the
create failed for a real reason and the original exception propagates.

Two new unit tests: one covers the race recovery, the other guards against
silently swallowing non-race create errors.

Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Comment thread sources/knowledge_layer/src/opensearch/adapter.py
rhossi and others added 4 commits May 12, 2026 09:59
The previous query body used "size": 10000, which is the OpenSearch
index.max_result_window cap. For collections holding more than 10 000 chunks
the search silently returned only the first 10 000 hits, _files_from_hits
grouped those into file objects, and the returned list (plus the file_count
in _collection_info_from_index) was silently incomplete.

Switch list_files to use a terms aggregation on file_name with a top_hits
sub-aggregation for representative metadata and a content_types sub-agg for
the per-file content-type set. Chunk counts now come from bucket doc_count,
not from counted hits, so a single file with 50k chunks is reported as one
file with 50k chunks — not 10k truncated chunks across N files.

Rename _files_from_hits to _files_from_aggregations to reflect the new
response shape. Update the FakeOpenSearchClient test fake to compute
aggregations alongside its existing hits path so the end-to-end test
continues to exercise the new query.

Adds a unit test that mocks the OpenSearch client to return an aggregation
response with a 50 000-chunk bucket; asserts the body uses size:0 + aggs
and that the returned chunk_count reflects bucket doc_count rather than
truncated hits.

Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
_create_dask_client() opens a TCP connection to the scheduler. If the
subsequent client.submit() raised — scheduler unreachable, serialisation
error, key conflict — the method exited via exception and the client
reference went out of scope without close() being called. The monitor
thread, which is the only place that calls client.close(), is only started
when submit succeeds. In auto mode the immediate local-mode fallback kept
running while connections accumulated; over repeated retries the scheduler
could exhaust its connection limit.

Wrap submit() in try/except, close the client on failure, then re-raise so
the caller's existing dask-vs-auto handling stays intact. The close() call
itself is guarded so a close failure cannot mask the original submit
exception.

Add a unit test using a Dask client fake that raises on submit and tracks
close(); asserts close was invoked, the job is marked FAILED, and the
original error message propagates.

Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
delete_file's in-memory cleanup block ran unconditionally, even when
delete_by_query (or the AOSS bulk-delete fallback) reported deleted == 0.
The bad case: delete_file arrives while a file is still UPLOADING or
INGESTING. OpenSearch has no documents to delete yet, so deleted == 0,
but the in-memory tracking entry is still removed. Subsequent
get_file_status calls then fall through to the empty-index scan path and
return None or stale data — losing the live job state.

Move the in-memory eviction inside the `if deleted > 0:` branch so the
in-memory state is only cleared when OpenSearch actually had documents
for that file. Non-terminal in-flight entries (UPLOADING / INGESTING)
survive no-op deletes.

Adds a unit test: pre-populate the ingestor with an INGESTING file_id,
call delete_file on an empty index, assert the tracking entry and its
status are still observable via get_file_status.

Signed-off-by: Felipe Garcia <fdecarvalhop@nvidia.com>
Resolve E501 line-length violations and I001 import-sort issues flagged
by CI. No behavioral changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread sources/knowledge_layer/src/opensearch/adapter.py Outdated
The terms aggregation silently capped distinct files at size=10000 —
collections with more files dropped the rest with no warning, and
_collection_info_from_index propagated the wrong file_count. Switch to
a composite aggregation with after_key pagination so every distinct
file_name is enumerated; the for/else runaway guard logs (rather than
silently truncates) if max_pages is hit. _files_from_aggregations
renamed to _files_from_buckets and handles composite's dict-key shape.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@AjayThorve AjayThorve added AIQ2.2 enhancement New feature or request labels May 12, 2026
@AjayThorve
Copy link
Copy Markdown
Collaborator

Thank you @rhossi , we will review this soon

Latent format drift from the original test-suite add (70ba335) was
masked by earlier ruff-check failures aborting the CI script before
ruff format --check ran. Normalize now so the whole tree passes
ruff format --check .

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread sources/knowledge_layer/src/opensearch/adapter.py
source_path was written into every chunk's metadata and propagated by
normalize() into every Chunk surfaced to API consumers and LLM context.
For byte-upload paths (Dask mode, upload_file) this leaked temp paths
such as /tmp/tmpXXXXXX.pdf. The field was write-only — no production
code or test read it — so drop it from the indexed document. Regression
test added asserting no indexed chunk and no retrieved Chunk metadata
carries source_path. Flagged by Greptile (P1) on PR NVIDIA-AI-Blueprints#228.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIQ2.2 enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants