Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion nodes/src/nodes/response/IInstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import os
import base64

from rocketlib import IInstanceBase
from rocketlib import IInstanceBase, IJson
from ai.common.schema import Doc, Question, Answer
from ai.common.image import ImageProcessor
from rocketlib import AVI_ACTION, Entry, debug
Expand Down Expand Up @@ -158,6 +158,17 @@ def writeTable(self, table: str):
# Add the table
self.instance.currentObject.response[key].append(table)

def writeJson(self, data: IJson):
# Get the key to write to (official lane name is "json")
key = self._getkey('json')

# If it isn't there, create it
if key not in self.instance.currentObject.response:
self.instance.currentObject.response[key] = []

# Add the json
self.instance.currentObject.response[key].append(IJson.toDict(data))

def writeDocuments(self, documents: List[Doc]):
# Get the key to write to
key = self._getkey('documents')
Expand Down
13 changes: 11 additions & 2 deletions nodes/src/nodes/response/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Data handling per lane:

- **text** - chunks are accumulated (joined with blank lines) and appended as one string per object.
- **table**, **documents**, **questions** - appended as-is; documents and questions are serialized to plain dicts.
- **json** - appended as-is (one entry per write).
- **answers** - appended as parsed JSON when the answer is JSON, otherwise as plain text.
- **image** - streamed chunks are buffered via AVI_ACTION signals (BEGIN / WRITE / END), then the complete image is base64-encoded and appended as `{"mime_type": ..., "image": ...}`.
- **audio**, **video** - the raw bytes are not returned; only tracking metadata is appended: `{"url", "aviAction", "mimeType", "size"}`.
Expand All @@ -22,15 +23,16 @@ The node has no Python dependencies of its own (`requirements.txt` is empty); it

## Service variants

The same implementation is registered as nine services. The generic **HTTP Results** service (`response://`) accepts all eight lane types and lets you map each lane to its own result key. Eight single-lane variants accept exactly one lane each and expose a single `laneName` field:
The same implementation is registered as ten services. The generic **HTTP Results** service (`response://`) accepts all nine lane types and lets you map each lane to its own result key. Nine single-lane variants accept exactly one lane each and expose a single `laneName` field:

| Service title | Protocol | Lane | Default result key |
|------------------|---------------------------|-------------|--------------------|
| HTTP Results | `response://` | all eight | the lane type name |
| HTTP Results | `response://` | all nine | the lane type name |
| Return Answers | `response_answers://` | `answers` | `answers` |
| Return Audio | `response_audio://` | `audio` | `audio` |
| Return Documents | `response_documents://` | `documents` | `documents` |
| Return Image | `response_image://` | `image` | `image` |
| Return JSON | `response_json://` | `json` | `json` |
Comment on lines +26 to +35

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Regenerate node docs so the generated schema includes the new JSON variant.

Line 26/Line 35 documents response_json://, but the generated schema block (Line 101-165) does not list a Return JSON section. Please run nodes:docs-generate and commit the regenerated README content.

As per coding guidelines, documentation for node inputs/outputs/config must stay co-located and content between <!-- ROCKETRIDE:GENERATED:PARAMS START --> and <!-- ROCKETRIDE:GENERATED:PARAMS END --> is generator-managed.

Also applies to: 101-165

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@nodes/src/nodes/response/README.md` around lines 26 - 35, The README.md file
in nodes/src/nodes/response/ has manual documentation for the new
response_json:// service variant (lines 26-35), but the auto-generated schema
block between the ROCKETRIDE:GENERATED:PARAMS START and
ROCKETRIDE:GENERATED:PARAMS END markers (lines 101-165) is missing the
corresponding Return JSON section. Run the nodes:docs-generate command to
regenerate the documentation schema block so it includes all nine service
variants including the new JSON one, then commit the updated README file with
the regenerated content.

Source: Coding guidelines

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed, recheck

| Return Questions | `response_questions://` | `questions` | `questions` |
| Return Table | `response_table://` | `table` | `table` |
| Return Text | `response_text://` | `text` | `text` |
Expand All @@ -50,6 +52,7 @@ All lanes are inputs; the node produces no output lanes.
|-------------|----------|-------------|
| `text` | - | Captured under the configured key |
| `table` | - | Captured under the configured key |
| `json` | - | Captured under the configured key |
| `documents` | - | Captured under the configured key |
| `questions` | - | Captured under the configured key |
| `answers` | - | Captured under the configured key |
Expand Down Expand Up @@ -132,6 +135,12 @@ Each configured result key holds an array: one element per result produced for t
| `laneName` | `string` | **Result key** | |
| `lanes` | `array` | **Lanes**<br/>Each lane maps pipeline data to a custom JSON key in the response. Select the data type (text, documents, answers, etc.) for Lane Name, and enter a custom JSON key name (1-32 characters) for Result Key. | |

### Return JSON (`services.json.json`)

| Field | Type | Description | Default |
|---|---|---|---|
| `laneName` | `string` | **Identifier key within result** | `"json"` |

### Return Questions (`services.questions.json`)

| Field | Type | Description | Default |
Expand Down
2 changes: 2 additions & 0 deletions nodes/src/nodes/response/services.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"lanes": {
"text": [],
"table": [],
"json": [],
"documents": [],
"questions": [],
"answers": [],
Expand Down Expand Up @@ -104,6 +105,7 @@
["audio", "Audio"],
["documents", "Documents"],
["image", "Image"],
["json", "JSON"],
["questions", "Questions"],
["table", "Table"],
["text", "Text"],
Expand Down
111 changes: 111 additions & 0 deletions nodes/src/nodes/response/services.json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
{
//
// Required:
// The displayable name of this node
//
"title": "Return JSON",
//
// Required:
// The protocol is the endpoint protocol
//
"protocol": "response_json://",
//
// Required:
// Class type of the node - what it does
//
"classType": ["infrastructure"],
//
// Required:
// Capabilities are flags that change the behavior of the underlying
// engine
//
"capabilities": [],
//
// Optional:
// Register is either filter, endpoint or ignored if not specified. If the
// type is specified, a factory is registered of that given type
//
"register": "filter",
//
// Optional:
// The node is the actual physical node to instantiate - if
// not specified, the protocol will be used
//
"node": "python",
//
// Optional:
// The path is the executable/script code - it is node dependent
// and is optional for most node
//
"path": "nodes.response",
//
// Required:
// The prefix map when added/removed when convertting URLs <=> paths
//
"prefix": "response",
//
// Optional:
// Description to of this driver
//
"description": ["A component that returns structured JSON data back to the requesting client. It handles ", "the response phase of the HTTP request-response cycle by returning the results as a JSON ", "object. This component ensures that data is transmitted efficiently and clearly to ", "the client, providing a standardized format for the response."],
//
// Optional:
// The icon is the icon to display in the UI for this node
//
"icon": "util-infrastructure.svg",
"documentation": "https://docs.rocketride.org",
//
// Optional:
// Rendering hints to the UI which indicate which fields of
// the configuration should be used to display information
"tile": [],
//
// Optional:
// As a pipe component, define what this pipe component takes
// and what it produces
//
"lanes": {
"json": []
},
//
// Optional:
// Profile section are configuration optoins used by the driver
// itself
//
"preconfig": {
// Define the values that will be merged into any profile configuration
// specified, unless the profile is 'absolute'
"default": "default",
// Defines profiles used with the "profile": key
"profiles": {
"default": {}
}
},
//
// Optional:
// Local fields definitions - these define fields only for the
// current service. You may specify them here, or directly
// in the shape
//
"fields": {
"laneName": {
"type": "string",
"title": "Identifier key within result",
"minLength": 1,
"default": "json",
"maxLength": 32
}
},
//
// Required:
// Defines the fields (shape) of the service. Either source or target
// map be specified, or both, but at least one is required
//
"shape": [
{
"section": "Pipe",
"title": "Return JSON",
"properties": ["laneName"]
}
]
}
2 changes: 1 addition & 1 deletion nodes/src/nodes/webhook/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Each variant takes the internal `_source` input and emits to its declared output

| Variant | Lane in | Lanes out |
| -------- | ------- | ---------------------------------------------------- |
| Webhook | - | `tags`, `text`, `audio`, `video`, `image`, `questions` |
| Webhook | - | `tags`, `text`, `json`, `audio`, `video`, `image`, `questions` |
| Chat | - | `questions` |
| Dropper | - | `tags` |

Expand Down
2 changes: 1 addition & 1 deletion nodes/src/nodes/webhook/services.webhook.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
// and what it produces
//
"lanes": {
"_source": ["tags", "text", "audio", "video", "image", "questions"]
"_source": ["tags", "text", "json", "audio", "video", "image", "questions"]
},
//
// Optional:
Expand Down
12 changes: 12 additions & 0 deletions packages/ai/src/ai/modules/data/data_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Ec,
Entry,
IInvokeTool,
IJson,
IServiceEndpoint,
IServiceFilterPipe,
getObject,
Expand Down Expand Up @@ -206,6 +207,10 @@ def _determine_lane(self, mime_type: str, pipe_instance: IServiceFilterPipe) ->
elif mime_type.startswith('application/rocketride-question') and 'questions' in listeners:
return 'questions'

# If this is json content and we have a json listener
elif mime_type.startswith('application/json') and 'json' in listeners:
return 'json'

# If this is text content and we have a text listener
elif mime_type.startswith('text/') and 'text' in listeners:
return 'text'
Expand Down Expand Up @@ -595,6 +600,13 @@ def write_sync():
string_data = data.decode('utf-8')
pipe.writeText(string_data)

elif lane == 'json':
try:
json_data = json.loads(data.decode('utf-8'))
pipe.writeJson(IJson(json_data))
except Exception as e:
raise ValueError(str(e))

elif lane == 'audio':
pipe.writeAudio(AVI_ACTION.WRITE, mime_type, data)

Expand Down
21 changes: 21 additions & 0 deletions packages/ai/tests/ai/modules/data/test_data_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ def test_determine_lane_question_without_listener_falls_back_to_raw():
assert conn._determine_lane('application/rocketride-question+json', pipe) == 'raw'


def test_determine_lane_json_with_json_listener():
"""'application/json' maps to the 'json' lane when that listener exists."""
conn = _make_conn()
pipe = _make_pipe_with_listeners(['json'])
assert conn._determine_lane('application/json', pipe) == 'json'


def test_determine_lane_json_without_listener_falls_back_to_raw():
"""Without a 'json' listener, 'application/json' falls back to raw."""
conn = _make_conn()
pipe = _make_pipe_with_listeners([])
assert conn._determine_lane('application/json', pipe) == 'raw'


def test_determine_lane_json_tolerates_charset_parameter():
"""'application/json; charset=utf-8' still maps to the 'json' lane."""
conn = _make_conn()
pipe = _make_pipe_with_listeners(['json'])
assert conn._determine_lane('application/json; charset=utf-8', pipe) == 'json'


@pytest.mark.parametrize(
'mime, listener, expected',
[
Expand Down
2 changes: 2 additions & 0 deletions packages/server/engine-lib/engLib/python/IJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ void setJsonValue(json::Value &target,
target = value.cast<double>();
else if (py::isinstance<json::Value>(value))
target = value.cast<json::Value>();
else if (py::isinstance<IJson>(value))
target = *value.cast<IJson>().getJsonValue();
else if (py::isinstance<py::dict>(value)) {
// Set it as an object
target = json::Value(json::objectValue);
Expand Down
21 changes: 21 additions & 0 deletions packages/server/engine-lib/engLib/store/core/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,27 @@ Error Binder::writeWords(const WordVector &textWords) noexcept {
return callMethods(this, "words", call, serializeTrace);
}

/**
* @brief Writes JSON data to all bound service filter instances.
*
* @param jsonData A JSON value to write.
* @return Error Returns an error code if any instance fails, otherwise success.
*/
Error Binder::writeJson(const json::Value &jsonData) noexcept {
auto call = localfcn(auto pInstance)->Error {
return pInstance->writeJson(jsonData);
};

auto serializeTrace = [&](PIPELINE_TRACE_LEVEL level, json::Value &out) {
if (level >= PIPELINE_TRACE_LEVEL::SUMMARY)
out["size"] = (int)jsonData.size();
if (level >= PIPELINE_TRACE_LEVEL::FULL)
out["json"] = jsonData;
};

return callMethods(this, "json", call, serializeTrace);
}

/**
* @brief Writes audio data to all bound service filter instances.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,21 @@ void IServiceFilterInstance::cb_sendTable(const std::u16string &text) noexcept(
}
}

void IServiceFilterInstance::cb_sendJson(
python::IJson json) noexcept(false) {
// Check to make sure source mode
if (this->endpoint->config.endpointMode != ENDPOINT_MODE::SOURCE)
throw APERR(Ec::InvalidParam,
"You must be in source mode to use sendJson");

// Unlock python and sent it along
_block() {
engine::python::UnlockPython unlock;

if (auto ccode = sendJson(*m_pTarget, *json.getJsonValue())) throw ccode;
}
}

void IServiceFilterInstance::cb_sendAudio(
const AVI_ACTION action, Text &mimeType,
const pybind11::bytes &streamData) noexcept(false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,21 @@ void IServiceFilterInstance::cb_writeWords(
}
}

void IServiceFilterInstance::cb_writeJson(
python::IJson json) noexcept(false) {
// Check to make sure target mode
if (endpoint->config.endpointMode != ENDPOINT_MODE::TARGET)
throw APERR(Ec::InvalidParam,
"You must be in target mode to use writeJson");

// Unlock python and send it along
_block() {
engine::python::UnlockPython unlock;

if (auto ccode = binder.writeJson(*json.getJsonValue())) throw ccode;
}
}

void IServiceFilterInstance::cb_writeAudio(
const AVI_ACTION action, Text &mimeType,
const pybind11::bytes &streamData) noexcept(false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ Error IServiceFilterInstance::sendWords(ServicePipe &target,
return pDown->sendWords(target, textWords);
}

Error IServiceFilterInstance::sendJson(ServicePipe &target,
const json::Value &jsonData) noexcept {
// Check the mode
if (this->endpoint->config.endpointMode != ENDPOINT_MODE::SOURCE)
return APERR(Ec::InvalidParam, "This function requires source mode");

// Send it along
return pDown->sendJson(target, jsonData);
}

Error IServiceFilterInstance::sendAudio(
ServicePipe &target, const AVI_ACTION action, Text &mimeType,
const pybind11::bytes &streamData) noexcept {
Expand Down
Loading
Loading