Skip to content

Latest commit

 

History

History
487 lines (373 loc) · 15.1 KB

File metadata and controls

487 lines (373 loc) · 15.1 KB

Adding a Data Source

This guide walks through creating a new data source plugin. Data sources are specialized tools that provide domain-specific search capabilities -- academic papers, patent databases, financial data, internal knowledge bases, and similar services.

The pattern follows the existing Google Scholar paper search at sources/google_scholar_paper_search/.


Data Source vs. Tool

A data source is architecturally identical to a tool. The distinction is conceptual:

Aspect Tool Data Source
Purpose General utility (web search, code execution) Domain-specific information retrieval
Location sources/ sources/
Registration @register_function @register_function
Agent interaction Agent calls it for actions Agent calls it for knowledge

Both use the same NeMo Agent Toolkit plugin pattern. This guide focuses on data-source-specific patterns like structured result formatting, pagination, and search parameter handling.


Prerequisites

  • The repository virtual environment is active (.venv)
  • You understand NeMo Agent Toolkit's @register_function decorator
  • You have access to the data source's API or SDK

Step 1: Create the Package

sources/my_data_source/
    pyproject.toml
    README.md
    src/
        __init__.py
        register.py       # Config + NAT registration
        search.py          # Search implementation
    tests/
        test_search.py
mkdir -p sources/my_data_source/src sources/my_data_source/tests
touch sources/my_data_source/src/__init__.py

Step 2: Implement the Search Client

Separate the API client logic from the NeMo Agent Toolkit registration. This makes it testable independently.

# sources/my_data_source/src/search.py

import logging
from typing import Any

import httpx

logger = logging.getLogger(__name__)


class PatentSearchClient:
    """Client for searching a patent database API."""

    def __init__(
        self,
        api_key: str,
        timeout: int = 30,
        max_results: int = 10,
    ):
        self.api_key = api_key
        self.timeout = timeout
        self.max_results = max_results

    async def search(
        self,
        query: str,
        year: str | None = None,
        jurisdiction: str | None = None,
    ) -> str:
        """Search for patents matching the query.

        Args:
            query: The search query describing the invention or technology.
            year: Optional year filter (e.g., "2024").
            jurisdiction: Optional jurisdiction filter (e.g., "US", "EP").

        Returns:
            Formatted patent search results as a string.
        """
        params: dict[str, Any] = {
            "q": query,
            "limit": self.max_results,
        }
        if year:
            params["year"] = year
        if jurisdiction:
            params["jurisdiction"] = jurisdiction

        async with httpx.AsyncClient(timeout=self.timeout) as client:
            response = await client.get(
                "https://api.patents.example.com/v2/search",
                params=params,
                headers={"X-API-Key": self.api_key},
            )
            response.raise_for_status()
            data = response.json()

        patents = data.get("patents", [])
        if not patents:
            return f"No patents found for query: {query}"

        results = []
        for patent in patents:
            patent_id = patent.get("id", "")
            title = patent.get("title", "")
            abstract = patent.get("abstract", "")
            filing_date = patent.get("filing_date", "")
            url = patent.get("url", "")

            results.append(
                f"**{title}** ({patent_id})\n"
                f"Filed: {filing_date}\n"
                f"Abstract: {abstract}\n"
                f"Link: {url}"
            )

        return "\n\n---\n\n".join(results)

Step 3: Define Config and Register

# sources/my_data_source/src/register.py

import logging
import os

from pydantic import Field, SecretStr

from nat.builder.builder import Builder
from nat.builder.function_info import FunctionInfo
from nat.cli.register_workflow import register_function
from nat.data_models.function import FunctionBaseConfig

from .search import PatentSearchClient

logger = logging.getLogger(__name__)

_missing_key_warned = False


class PatentSearchConfig(FunctionBaseConfig, name="patent_search"):
    """
    Configuration for the patent search data source.

    Searches a patent database API for relevant patents.
    Requires a PATENT_API_KEY environment variable or config.
    """

    timeout: int = Field(
        default=30, description="Timeout in seconds for search requests"
    )
    max_results: int = Field(
        default=10, description="Maximum number of results to return"
    )
    api_key: SecretStr | None = Field(
        default=None, description="API key for the patent search service"
    )


@register_function(config_type=PatentSearchConfig)
async def patent_search(tool_config: PatentSearchConfig, builder: Builder):
    """Register patent search data source."""

    # Resolve API key
    if not os.environ.get("PATENT_API_KEY") and tool_config.api_key:
        os.environ["PATENT_API_KEY"] = tool_config.api_key.get_secret_value()

    api_key = os.environ.get("PATENT_API_KEY")

    if not api_key:
        global _missing_key_warned
        if not _missing_key_warned:
            logger.warning(
                "PATENT_API_KEY not found. The patent search tool will be "
                "registered but will return an error when called."
            )
            _missing_key_warned = True

        async def _stub(query: str, year: str | None = None) -> str:
            """Patent search (unavailable - missing PATENT_API_KEY)."""
            return (
                "Error: Patent search is unavailable because PATENT_API_KEY is not set.\n"
                "Set the API key in your environment or .env file and restart."
            )

        yield FunctionInfo.from_fn(_stub, description=_stub.__doc__)
        return

    client = PatentSearchClient(
        api_key=api_key,
        timeout=tool_config.timeout,
        max_results=tool_config.max_results,
    )

    async def _patent_search(
        query: str,
        year: str | None = None,
        jurisdiction: str | None = None,
    ) -> str:
        """Searches for patents and intellectual property filings.

        This tool returns patents from a patent database with titles,
        abstracts, filing dates, and links. Use for queries about
        inventions, prior art, technology patents, and IP research.
        """
        return await client.search(query, year, jurisdiction)

    yield FunctionInfo.from_fn(
        _patent_search,
        description=_patent_search.__doc__,
    )

Step 4: Create pyproject.toml

# sources/my_data_source/pyproject.toml

[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools >= 64", "setuptools-scm>=8"]

[tool.setuptools]
packages = ["my_data_source"]
package-dir = {"my_data_source" = "src"}

[project]
name = "my-data-source"
version = "1.0.0"
description = "NAT-based patent search data source"
requires-python = ">=3.11,<3.14"
dependencies = [
    "nvidia-nat==1.5.0",
    "httpx>=0.24.0",
    "pydantic>=2.0.0",
]

[project.entry-points."nat.plugins"]
patent_search = "my_data_source.register"

Install:

uv pip install -e ./sources/my_data_source

Step 5: Register as a Data Source in YAML

To make your data source appear in the UI as a toggleable source with per-message filtering, add it to the data_source_registry in your config. Agents automatically inherit all registry tools, so this is the only config change needed:

functions:
  # Register data sources for UI toggles and filtering
  data_sources:
    _type: data_source_registry
    sources:
      - id: web_search
        name: "Web Search"
        description: "Search the web for real-time information."
        tools:
          - web_search_tool
      - id: patent_search
        name: "Patent Search"
        description: "Search patent databases for intellectual property filings."
        tools:
          - patent_tool

  # Define the tool instances
  patent_tool:
    _type: patent_search
    max_results: 5
    timeout: 15

  web_search_tool:
    _type: tavily_web_search
    max_results: 3

  # Agents inherit all registry tools automatically --
  # no need to list tools per-agent unless you want to exclude specific ones
  shallow_research_agent:
    _type: shallow_research_agent
    llm: research_llm

The data_source_registry provides:

  • GET /v1/data_sources API endpoint returns the registered sources (the UI renders these as toggles)
  • Per-message filtering via data_sources: ["web_search"] in the WebSocket chat payload, or as a data_sources field on POST /v1/jobs/async/submit -- only tools belonging to selected sources are active
  • Display metadata (name, description) shown in the UI
  • Auth gating -- set requires_auth: true on a source to grey it out in the UI until the user signs in (e.g., enterprise sources that need user-level OAuth tokens). Sources using backend API keys (Tavily, Serper) should leave this false (the default).
  • Auto-inheritance -- all agents get every registered tool by default (use exclude_tools on an agent for per-agent specialization)

If a tool isn't listed in any data_source_registry source entry, it is always included regardless of filtering (e.g., utility tools like "think" or "calculator"). Passing an explicit empty list (data_sources: []) -- in the WebSocket chat payload or in a POST /v1/jobs/async/submit body -- disables data-source tools while leaving those unmapped utility tools available.

Source Entry Field Reference

Field Type Default Description
id string required Unique key used in API payloads and filtering
name string required Display name shown in the UI
description string "" Human-readable description for the UI
tools list [] NAT function or function group names belonging to this source
requires_auth bool false Grey out in UI until user signs in (for user-token sources)
default_enabled bool true Whether enabled by default in the UI

Using MCP Tools as Data Sources

NAT MCP client tools are function groups. They work with the data source registry the same way as any other function group -- just reference the group name in tools:

function_groups:
  mcp_docs_search:
    _type: mcp_client
    server:
      transport: streamable-http
      url: "http://localhost:9901/mcp"

functions:
  data_sources:
    _type: data_source_registry
    sources:
      - id: web_search
        name: "Web Search"
        description: "Search the web for real-time information."
        tools:
          - web_search_tool
      - id: docs_search
        name: "Internal Docs"
        description: "Search internal documentation via MCP."
        tools:
          - mcp_docs_search

  web_search_tool:
    _type: tavily_web_search

  deep_research_agent:
    _type: deep_research_agent
    tools:
      - web_search_tool
      - mcp_docs_search

The registry auto-detects that mcp_docs_search is a function group and uses NAT's group separator (__) for prefix matching. All tools exposed by the MCP server (e.g., mcp_docs_search__find_pages, mcp_docs_search__get_page) will map to the docs_search data source.


Data Source Design Patterns

Multi-Parameter Search Functions

Data sources often accept optional filters beyond the query string. Expose these as optional parameters with clear type annotations:

async def _search(
    query: str,
    year: str | None = None,
    jurisdiction: str | None = None,
    category: str | None = None,
) -> str:
    """Search patents with optional filters.

    Args:
        query: Natural language description of the technology.
        year: Filter by filing year (e.g., "2024").
        jurisdiction: Filter by jurisdiction ("US", "EP", "CN").
        category: Filter by patent category.
    """

The LLM will use the parameter descriptions to decide which filters to apply.

Structured Result Formatting

Format results so the agent can extract citations. Use a consistent pattern:

# XML Document format (matches Tavily pattern)
f'<Document href="{url}">\n<title>\n{title}\n</title>\n{content}\n</Document>'

# Or structured text format
f"**{title}** ({id})\nAbstract: {abstract}\nLink: {url}"

Pagination

If the source API supports pagination, handle it internally rather than exposing it to the agent:

async def search(self, query: str) -> str:
    all_results = []
    offset = 0

    while len(all_results) < self.max_results:
        batch = await self._fetch_page(query, offset, batch_size=20)
        if not batch:
            break
        all_results.extend(batch)
        offset += len(batch)

    return self._format_results(all_results[:self.max_results])

Rate Limiting and Retries

import asyncio

async def search(self, query: str) -> str:
    for attempt in range(self.max_retries):
        try:
            return await self._do_search(query)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                wait = 2 ** attempt
                logger.warning(f"Rate limited, retrying in {wait}s")
                await asyncio.sleep(wait)
            elif attempt == self.max_retries - 1:
                return f"Error: Search failed after {self.max_retries} attempts - {e}"
            else:
                await asyncio.sleep(1)

Existing Data Source Reference

Data Source _type Package Description
Tavily Web Search tavily_web_search sources/tavily_web_search General web search through Tavily API
Exa Web Search exa_web_search sources/exa_web_search General web search through Exa API
Google Scholar paper_search sources/google_scholar_paper_search Academic papers through Serper/Google Scholar
Knowledge Layer knowledge_retrieval sources/knowledge_layer Document retrieval through pluggable backends

Checklist

  • Package created under sources/<name>/ with pyproject.toml
  • Search client implemented separately from registration
  • Config class with FunctionBaseConfig and unique name
  • @register_function with stub fallback for missing API key
  • Clear docstring explaining what the data source searches
  • Optional search parameters with descriptions for the LLM
  • Proper error handling and retries
  • Entry point in pyproject.toml
  • Added to data_source_registry in YAML config for UI integration
  • Installed and tested with nat run

Related