diff --git a/nodes/src/nodes/response/IInstance.py b/nodes/src/nodes/response/IInstance.py index 196590632..dceb24f65 100644 --- a/nodes/src/nodes/response/IInstance.py +++ b/nodes/src/nodes/response/IInstance.py @@ -25,7 +25,7 @@ import os import base64 -from rocketlib import IInstanceBase +from rocketlib import IInstanceBase, IJson from ai.common.schema import Doc, Question, Answer from ai.common.image import ImageProcessor from rocketlib import AVI_ACTION, Entry, debug @@ -158,6 +158,17 @@ def writeTable(self, table: str): # Add the table self.instance.currentObject.response[key].append(table) + def writeJson(self, data: IJson): + # Get the key to write to (official lane name is "json") + key = self._getkey('json') + + # If it isn't there, create it + if key not in self.instance.currentObject.response: + self.instance.currentObject.response[key] = [] + + # Add the json + self.instance.currentObject.response[key].append(IJson.toDict(data)) + def writeDocuments(self, documents: List[Doc]): # Get the key to write to key = self._getkey('documents') diff --git a/nodes/src/nodes/response/README.md b/nodes/src/nodes/response/README.md index c65a2608f..09e562b0a 100644 --- a/nodes/src/nodes/response/README.md +++ b/nodes/src/nodes/response/README.md @@ -12,6 +12,7 @@ Data handling per lane: - **text** - chunks are accumulated (joined with blank lines) and appended as one string per object. - **table**, **documents**, **questions** - appended as-is; documents and questions are serialized to plain dicts. +- **json** - appended as-is (one entry per write). - **answers** - appended as parsed JSON when the answer is JSON, otherwise as plain text. - **image** - streamed chunks are buffered via AVI_ACTION signals (BEGIN / WRITE / END), then the complete image is base64-encoded and appended as `{"mime_type": ..., "image": ...}`. - **audio**, **video** - the raw bytes are not returned; only tracking metadata is appended: `{"url", "aviAction", "mimeType", "size"}`. @@ -22,15 +23,16 @@ The node has no Python dependencies of its own (`requirements.txt` is empty); it ## Service variants -The same implementation is registered as nine services. The generic **HTTP Results** service (`response://`) accepts all eight lane types and lets you map each lane to its own result key. Eight single-lane variants accept exactly one lane each and expose a single `laneName` field: +The same implementation is registered as ten services. The generic **HTTP Results** service (`response://`) accepts all nine lane types and lets you map each lane to its own result key. Nine single-lane variants accept exactly one lane each and expose a single `laneName` field: | Service title | Protocol | Lane | Default result key | |------------------|---------------------------|-------------|--------------------| -| HTTP Results | `response://` | all eight | the lane type name | +| HTTP Results | `response://` | all nine | the lane type name | | Return Answers | `response_answers://` | `answers` | `answers` | | Return Audio | `response_audio://` | `audio` | `audio` | | Return Documents | `response_documents://` | `documents` | `documents` | | Return Image | `response_image://` | `image` | `image` | +| Return JSON | `response_json://` | `json` | `json` | | Return Questions | `response_questions://` | `questions` | `questions` | | Return Table | `response_table://` | `table` | `table` | | Return Text | `response_text://` | `text` | `text` | @@ -50,6 +52,7 @@ All lanes are inputs; the node produces no output lanes. |-------------|----------|-------------| | `text` | - | Captured under the configured key | | `table` | - | Captured under the configured key | +| `json` | - | Captured under the configured key | | `documents` | - | Captured under the configured key | | `questions` | - | Captured under the configured key | | `answers` | - | Captured under the configured key | @@ -132,6 +135,12 @@ Each configured result key holds an array: one element per result produced for t | `laneName` | `string` | **Result key** | | | `lanes` | `array` | **Lanes**
Each lane maps pipeline data to a custom JSON key in the response. Select the data type (text, documents, answers, etc.) for Lane Name, and enter a custom JSON key name (1-32 characters) for Result Key. | | +### Return JSON (`services.json.json`) + +| Field | Type | Description | Default | +|---|---|---|---| +| `laneName` | `string` | **Identifier key within result** | `"json"` | + ### Return Questions (`services.questions.json`) | Field | Type | Description | Default | diff --git a/nodes/src/nodes/response/services.json b/nodes/src/nodes/response/services.json index d2a9370d8..d097678c0 100644 --- a/nodes/src/nodes/response/services.json +++ b/nodes/src/nodes/response/services.json @@ -68,6 +68,7 @@ "lanes": { "text": [], "table": [], + "json": [], "documents": [], "questions": [], "answers": [], @@ -104,6 +105,7 @@ ["audio", "Audio"], ["documents", "Documents"], ["image", "Image"], + ["json", "JSON"], ["questions", "Questions"], ["table", "Table"], ["text", "Text"], diff --git a/nodes/src/nodes/response/services.json.json b/nodes/src/nodes/response/services.json.json new file mode 100644 index 000000000..1cacc4b78 --- /dev/null +++ b/nodes/src/nodes/response/services.json.json @@ -0,0 +1,111 @@ +{ + // + // Required: + // The displayable name of this node + // + "title": "Return JSON", + // + // Required: + // The protocol is the endpoint protocol + // + "protocol": "response_json://", + // + // Required: + // Class type of the node - what it does + // + "classType": ["infrastructure"], + // + // Required: + // Capabilities are flags that change the behavior of the underlying + // engine + // + "capabilities": [], + // + // Optional: + // Register is either filter, endpoint or ignored if not specified. If the + // type is specified, a factory is registered of that given type + // + "register": "filter", + // + // Optional: + // The node is the actual physical node to instantiate - if + // not specified, the protocol will be used + // + "node": "python", + // + // Optional: + // The path is the executable/script code - it is node dependent + // and is optional for most node + // + "path": "nodes.response", + // + // Required: + // The prefix map when added/removed when convertting URLs <=> paths + // + "prefix": "response", + // + // Optional: + // Description to of this driver + // + "description": ["A component that returns structured JSON data back to the requesting client. It handles ", "the response phase of the HTTP request-response cycle by returning the results as a JSON ", "object. This component ensures that data is transmitted efficiently and clearly to ", "the client, providing a standardized format for the response."], + // + // Optional: + // The icon is the icon to display in the UI for this node + // + "icon": "util-infrastructure.svg", + "documentation": "https://docs.rocketride.org", + // + // Optional: + // Rendering hints to the UI which indicate which fields of + // the configuration should be used to display information + "tile": [], + // + // Optional: + // As a pipe component, define what this pipe component takes + // and what it produces + // + "lanes": { + "json": [] + }, + // + // Optional: + // Profile section are configuration optoins used by the driver + // itself + // + "preconfig": { + // Define the values that will be merged into any profile configuration + // specified, unless the profile is 'absolute' + "default": "default", + // Defines profiles used with the "profile": key + "profiles": { + "default": {} + } + }, + // + // Optional: + // Local fields definitions - these define fields only for the + // current service. You may specify them here, or directly + // in the shape + // + "fields": { + "laneName": { + "type": "string", + "title": "Identifier key within result", + "minLength": 1, + "default": "json", + "maxLength": 32 + } + }, + // + // Required: + // Defines the fields (shape) of the service. Either source or target + // map be specified, or both, but at least one is required + // + "shape": [ + { + "section": "Pipe", + "title": "Return JSON", + "properties": ["laneName"] + } + ] +} diff --git a/nodes/src/nodes/webhook/README.md b/nodes/src/nodes/webhook/README.md index 0db293bab..2a88ff6fd 100644 --- a/nodes/src/nodes/webhook/README.md +++ b/nodes/src/nodes/webhook/README.md @@ -24,7 +24,7 @@ Each variant takes the internal `_source` input and emits to its declared output | Variant | Lane in | Lanes out | | -------- | ------- | ---------------------------------------------------- | -| Webhook | - | `tags`, `text`, `audio`, `video`, `image`, `questions` | +| Webhook | - | `tags`, `text`, `json`, `audio`, `video`, `image`, `questions` | | Chat | - | `questions` | | Dropper | - | `tags` | diff --git a/nodes/src/nodes/webhook/services.webhook.json b/nodes/src/nodes/webhook/services.webhook.json index 11899ca02..29d813997 100644 --- a/nodes/src/nodes/webhook/services.webhook.json +++ b/nodes/src/nodes/webhook/services.webhook.json @@ -66,7 +66,7 @@ // and what it produces // "lanes": { - "_source": ["tags", "text", "audio", "video", "image", "questions"] + "_source": ["tags", "text", "json", "audio", "video", "image", "questions"] }, // // Optional: diff --git a/packages/ai/src/ai/modules/data/data_conn.py b/packages/ai/src/ai/modules/data/data_conn.py index 3f7911eb8..f51062c5e 100644 --- a/packages/ai/src/ai/modules/data/data_conn.py +++ b/packages/ai/src/ai/modules/data/data_conn.py @@ -33,6 +33,7 @@ Ec, Entry, IInvokeTool, + IJson, IServiceEndpoint, IServiceFilterPipe, getObject, @@ -206,6 +207,10 @@ def _determine_lane(self, mime_type: str, pipe_instance: IServiceFilterPipe) -> elif mime_type.startswith('application/rocketride-question') and 'questions' in listeners: return 'questions' + # If this is json content and we have a json listener + elif mime_type.startswith('application/json') and 'json' in listeners: + return 'json' + # If this is text content and we have a text listener elif mime_type.startswith('text/') and 'text' in listeners: return 'text' @@ -595,6 +600,13 @@ def write_sync(): string_data = data.decode('utf-8') pipe.writeText(string_data) + elif lane == 'json': + try: + json_data = json.loads(data.decode('utf-8')) + pipe.writeJson(IJson(json_data)) + except Exception as e: + raise ValueError(str(e)) + elif lane == 'audio': pipe.writeAudio(AVI_ACTION.WRITE, mime_type, data) diff --git a/packages/ai/tests/ai/modules/data/test_data_conn.py b/packages/ai/tests/ai/modules/data/test_data_conn.py index 7b4cf40f4..693a99675 100644 --- a/packages/ai/tests/ai/modules/data/test_data_conn.py +++ b/packages/ai/tests/ai/modules/data/test_data_conn.py @@ -76,6 +76,27 @@ def test_determine_lane_question_without_listener_falls_back_to_raw(): assert conn._determine_lane('application/rocketride-question+json', pipe) == 'raw' +def test_determine_lane_json_with_json_listener(): + """'application/json' maps to the 'json' lane when that listener exists.""" + conn = _make_conn() + pipe = _make_pipe_with_listeners(['json']) + assert conn._determine_lane('application/json', pipe) == 'json' + + +def test_determine_lane_json_without_listener_falls_back_to_raw(): + """Without a 'json' listener, 'application/json' falls back to raw.""" + conn = _make_conn() + pipe = _make_pipe_with_listeners([]) + assert conn._determine_lane('application/json', pipe) == 'raw' + + +def test_determine_lane_json_tolerates_charset_parameter(): + """'application/json; charset=utf-8' still maps to the 'json' lane.""" + conn = _make_conn() + pipe = _make_pipe_with_listeners(['json']) + assert conn._determine_lane('application/json; charset=utf-8', pipe) == 'json' + + @pytest.mark.parametrize( 'mime, listener, expected', [ diff --git a/packages/server/engine-lib/engLib/python/IJson.cpp b/packages/server/engine-lib/engLib/python/IJson.cpp index 155bcfbf9..9de5d5d10 100644 --- a/packages/server/engine-lib/engLib/python/IJson.cpp +++ b/packages/server/engine-lib/engLib/python/IJson.cpp @@ -81,6 +81,8 @@ void setJsonValue(json::Value &target, target = value.cast(); else if (py::isinstance(value)) target = value.cast(); + else if (py::isinstance(value)) + target = *value.cast().getJsonValue(); else if (py::isinstance(value)) { // Set it as an object target = json::Value(json::objectValue); diff --git a/packages/server/engine-lib/engLib/store/core/binder.cpp b/packages/server/engine-lib/engLib/store/core/binder.cpp index 8af8e3b06..a9d2e5419 100644 --- a/packages/server/engine-lib/engLib/store/core/binder.cpp +++ b/packages/server/engine-lib/engLib/store/core/binder.cpp @@ -321,6 +321,27 @@ Error Binder::writeWords(const WordVector &textWords) noexcept { return callMethods(this, "words", call, serializeTrace); } +/** + * @brief Writes JSON data to all bound service filter instances. + * + * @param jsonData A JSON value to write. + * @return Error Returns an error code if any instance fails, otherwise success. + */ +Error Binder::writeJson(const json::Value &jsonData) noexcept { + auto call = localfcn(auto pInstance)->Error { + return pInstance->writeJson(jsonData); + }; + + auto serializeTrace = [&](PIPELINE_TRACE_LEVEL level, json::Value &out) { + if (level >= PIPELINE_TRACE_LEVEL::SUMMARY) + out["size"] = (int)jsonData.size(); + if (level >= PIPELINE_TRACE_LEVEL::FULL) + out["json"] = jsonData; + }; + + return callMethods(this, "json", call, serializeTrace); +} + /** * @brief Writes audio data to all bound service filter instances. * diff --git a/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.source.cpp b/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.source.cpp index cf50798ab..a32f1d405 100644 --- a/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.source.cpp +++ b/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.source.cpp @@ -244,6 +244,21 @@ void IServiceFilterInstance::cb_sendTable(const std::u16string &text) noexcept( } } +void IServiceFilterInstance::cb_sendJson( + python::IJson json) noexcept(false) { + // Check to make sure source mode + if (this->endpoint->config.endpointMode != ENDPOINT_MODE::SOURCE) + throw APERR(Ec::InvalidParam, + "You must be in source mode to use sendJson"); + + // Unlock python and sent it along + _block() { + engine::python::UnlockPython unlock; + + if (auto ccode = sendJson(*m_pTarget, *json.getJsonValue())) throw ccode; + } +} + void IServiceFilterInstance::cb_sendAudio( const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept(false) { diff --git a/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.target.cpp b/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.target.cpp index a74006458..c5a9d9e44 100644 --- a/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.target.cpp +++ b/packages/server/engine-lib/engLib/store/core/filter/filter.callbacks.target.cpp @@ -490,6 +490,21 @@ void IServiceFilterInstance::cb_writeWords( } } +void IServiceFilterInstance::cb_writeJson( + python::IJson json) noexcept(false) { + // Check to make sure target mode + if (endpoint->config.endpointMode != ENDPOINT_MODE::TARGET) + throw APERR(Ec::InvalidParam, + "You must be in target mode to use writeJson"); + + // Unlock python and send it along + _block() { + engine::python::UnlockPython unlock; + + if (auto ccode = binder.writeJson(*json.getJsonValue())) throw ccode; + } +} + void IServiceFilterInstance::cb_writeAudio( const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept(false) { diff --git a/packages/server/engine-lib/engLib/store/core/filter/filter.source.cpp b/packages/server/engine-lib/engLib/store/core/filter/filter.source.cpp index b223e96ca..785c81721 100644 --- a/packages/server/engine-lib/engLib/store/core/filter/filter.source.cpp +++ b/packages/server/engine-lib/engLib/store/core/filter/filter.source.cpp @@ -201,6 +201,16 @@ Error IServiceFilterInstance::sendWords(ServicePipe &target, return pDown->sendWords(target, textWords); } +Error IServiceFilterInstance::sendJson(ServicePipe &target, + const json::Value &jsonData) noexcept { + // Check the mode + if (this->endpoint->config.endpointMode != ENDPOINT_MODE::SOURCE) + return APERR(Ec::InvalidParam, "This function requires source mode"); + + // Send it along + return pDown->sendJson(target, jsonData); +} + Error IServiceFilterInstance::sendAudio( ServicePipe &target, const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept { diff --git a/packages/server/engine-lib/engLib/store/filters/bottom/bottom.hpp b/packages/server/engine-lib/engLib/store/filters/bottom/bottom.hpp index 0c223384f..a6bbd0973 100644 --- a/packages/server/engine-lib/engLib/store/filters/bottom/bottom.hpp +++ b/packages/server/engine-lib/engLib/store/filters/bottom/bottom.hpp @@ -155,6 +155,10 @@ class IFilterInstance : public IServiceFilterInstance { const Utf16View &text) noexcept override { return target->writeTable(text); } + virtual Error sendJson(ServicePipe &target, + const json::Value &jsonData) noexcept override { + return target->writeJson(jsonData); + } virtual Error sendAudio( ServicePipe &target, const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept override { @@ -216,6 +220,9 @@ class IFilterInstance : public IServiceFilterInstance { virtual Error writeWords(const WordVector &textWords) noexcept override { return {}; } + virtual Error writeJson(const json::Value &jsonData) noexcept override { + return {}; + } virtual Error writeAudio( const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept override { diff --git a/packages/server/engine-lib/engLib/store/headers/binder.hpp b/packages/server/engine-lib/engLib/store/headers/binder.hpp index 303ea36d8..f14fe1b5d 100644 --- a/packages/server/engine-lib/engLib/store/headers/binder.hpp +++ b/packages/server/engine-lib/engLib/store/headers/binder.hpp @@ -57,12 +57,13 @@ class Binder { methodMap; public: - static constexpr std::array MethodNames = { + static constexpr std::array MethodNames = { "open", "tags", "text", "table", "words", + "json", "audio", "video", "questions", @@ -105,6 +106,7 @@ class Binder { virtual Error writeText(const Utf16View &text) noexcept; virtual Error writeTable(const Utf16View &text) noexcept; virtual Error writeWords(const WordVector &textWords) noexcept; + virtual Error writeJson(const json::Value &jsonData) noexcept; virtual Error writeAudio(const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept; virtual Error writeVideo(const AVI_ACTION action, Text &mimeType, diff --git a/packages/server/engine-lib/engLib/store/headers/filter.hpp b/packages/server/engine-lib/engLib/store/headers/filter.hpp index 641bef5fd..efc6823e6 100644 --- a/packages/server/engine-lib/engLib/store/headers/filter.hpp +++ b/packages/server/engine-lib/engLib/store/headers/filter.hpp @@ -126,7 +126,7 @@ class IServiceFilterInstance { // Source mode (only available when ENDPOINT_MODE::SOURCE) // These functions are called by a source mode driver to send // data to the target the eventual target. They will call - // down through the source chain, eventualy end up in bottom + // down through the source chain, eventually end up in bottom // which will forward them to the top of the target //----------------------------------------------------------------- virtual uint32_t getThreadCount(uint32_t currentThreadCount) const noexcept; @@ -158,6 +158,8 @@ class IServiceFilterInstance { const Utf16View &text) noexcept; virtual Error sendWords(ServicePipe &target, const WordVector &textWords) noexcept; + virtual Error sendJson(ServicePipe &target, + const json::Value &jsonData) noexcept; virtual Error sendAudio(ServicePipe &target, const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept; @@ -205,6 +207,9 @@ class IServiceFilterInstance { virtual Error writeWords(const WordVector &textWords) noexcept { return binder.writeWords(textWords); } + virtual Error writeJson(const json::Value &jsonData) noexcept { + return binder.writeJson(jsonData); + } virtual Error writeAudio(const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept { return binder.writeAudio(action, mimeType, streamData); @@ -272,6 +277,7 @@ class IServiceFilterInstance { virtual void cb_sendTagData(py::object &data) noexcept(false); virtual void cb_sendText(const std::u16string &text) noexcept(false); virtual void cb_sendTable(const std::u16string &text) noexcept(false); + virtual void cb_sendJson(python::IJson json) noexcept(false); virtual void cb_sendAudio( const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept(false); @@ -328,6 +334,7 @@ class IServiceFilterInstance { virtual void cb_writeText(const std::u16string &text) noexcept(false); virtual void cb_writeTable(const std::u16string &text) noexcept(false); virtual void cb_writeWords(const WordVector &textWords) noexcept(false); + virtual void cb_writeJson(python::IJson json) noexcept(false); virtual void cb_writeAudio( const AVI_ACTION action, Text &mimeType, const pybind11::bytes &streamData) noexcept(false); @@ -481,7 +488,7 @@ class IServiceFilterInstance { //----------------------------------------------------------------- /// @details - /// The collected metdata + /// The collected metadata //----------------------------------------------------------------- json::Value m_metadata; diff --git a/packages/server/engine-lib/engLib/store/python/bindings.cpp b/packages/server/engine-lib/engLib/store/python/bindings.cpp index 3c2cdaaea..dc7bc1d52 100644 --- a/packages/server/engine-lib/engLib/store/python/bindings.cpp +++ b/packages/server/engine-lib/engLib/store/python/bindings.cpp @@ -900,6 +900,7 @@ PYBIND11_EMBEDDED_MODULE(engLib, engLib) { .PYBIND(sendTagEndObject, &IServiceFilterInstance::cb_sendTagEndObject) .PYBIND(sendText, &IServiceFilterInstance::cb_sendText) .PYBIND(sendTable, &IServiceFilterInstance::cb_sendTable) + .PYBIND(sendJson, &IServiceFilterInstance::cb_sendJson) .PYBIND(sendAudio, &IServiceFilterInstance::cb_sendAudio, py::arg("action"), py::arg("mimeType"), py::arg("streamData") = py::bytes()) @@ -943,6 +944,7 @@ PYBIND11_EMBEDDED_MODULE(engLib, engLib) { .PYBIND(writeTag, &IServiceFilterInstance::cb_writeTag) .PYBIND(writeText, &IServiceFilterInstance::cb_writeText) .PYBIND(writeTable, &IServiceFilterInstance::cb_writeTable) + .PYBIND(writeJson, &IServiceFilterInstance::cb_writeJson) .PYBIND(writeAudio, &IServiceFilterInstance::cb_writeAudio, py::arg("action"), py::arg("mimeType"), py::arg("streamData") = py::bytes()) @@ -1177,6 +1179,9 @@ PYBIND11_EMBEDDED_MODULE(engLib, engLib) { .PYBIND(__getitem__, &IJson::getitem) .PYBIND(__setitem__, &IJson::setitem); + py::implicitly_convertible(); + py::implicitly_convertible(); + //------------------------------------------------------------- /// @details /// Bind error codes (Ec enum) diff --git a/packages/server/engine-lib/engLib/store/python/python-base.hpp b/packages/server/engine-lib/engLib/store/python/python-base.hpp index 4d2d43d36..f4076e2a9 100644 --- a/packages/server/engine-lib/engLib/store/python/python-base.hpp +++ b/packages/server/engine-lib/engLib/store/python/python-base.hpp @@ -41,7 +41,7 @@ class IPythonInstanceBase; /// the method is implemented by the underlying python node or not //------------------------------------------------------------------------- APUTIL_DEFINE_ENUM_BITMASK( - PythonInstanceMethod, 0, 24, None = 0, + PythonInstanceMethod, 0, 25, None = 0, BeginInstance = BIT(0), EndInstance = BIT(1), @@ -55,7 +55,9 @@ APUTIL_DEFINE_ENUM_BITMASK( WriteDocuments = BIT(18), GetPermissions = BIT(19), OutputPermissions = BIT(20), - GetPermissionsBulk = BIT(21), GetThreadCount = BIT(22)); + GetPermissionsBulk = BIT(21), GetThreadCount = BIT(22), + + WriteJson = BIT(23)); //------------------------------------------------------------------------- /// @details @@ -268,6 +270,12 @@ class IPythonInstanceBase : public IServiceFilterInstance { //----------------------------------------------------------------- virtual Error writeWords(const WordVector &textWords) noexcept override; + //----------------------------------------------------------------- + // Public API : python-instance.json.cpp + // Supports json lane + //----------------------------------------------------------------- + virtual Error writeJson(const json::Value &jsonData) noexcept override; + //----------------------------------------------------------------- // Public API : python-instance.avi.cpp // Supports avi lanes diff --git a/packages/server/engine-lib/engLib/store/python/python-instance.cpp b/packages/server/engine-lib/engLib/store/python/python-instance.cpp index 9292d150a..12f744b7a 100644 --- a/packages/server/engine-lib/engLib/store/python/python-instance.cpp +++ b/packages/server/engine-lib/engLib/store/python/python-instance.cpp @@ -107,6 +107,7 @@ IPythonInstanceBase::IPythonInstanceBase(const FactoryArgs &args) noexcept bindMethod(PythonMethod::WriteText, "writeText"); bindMethod(PythonMethod::WriteTable, "writeTable"); bindMethod(PythonMethod::WriteWords, "writeWords"); + bindMethod(PythonMethod::WriteJson, "writeJson"); bindMethod(PythonMethod::WriteAudio, "writeAudio"); bindMethod(PythonMethod::WriteVideo, "writeVideo"); bindMethod(PythonMethod::WriteImage, "writeImage"); diff --git a/packages/server/engine-lib/engLib/store/python/python-instance.json.cpp b/packages/server/engine-lib/engLib/store/python/python-instance.json.cpp new file mode 100644 index 000000000..fa158af96 --- /dev/null +++ b/packages/server/engine-lib/engLib/store/python/python-instance.json.cpp @@ -0,0 +1,54 @@ +// ============================================================================= +// MIT License +// Copyright (c) 2026 Aparavi Software AG +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// ============================================================================= + +#include + +namespace engine::store::pythonBase { +//------------------------------------------------------------------------- +/// @details +/// Notify the filter of incoming JSON data +//------------------------------------------------------------------------- +Error IPythonInstanceBase::writeJson(const json::Value &jsonData) noexcept { + Error ccode; + + LOGPIPE(); + + // If the method is not supported, push it down + if (!(m_pyMethods & PythonMethod::WriteJson)) + return Parent::writeJson(jsonData); + + auto python = localfcn()->Error { + auto pyJson = IJson(jsonData); + + // Call it + m_pyInstance.attr("writeJson")(pyJson); + return {}; + }; + + ccode = callPython(python); + + if (checkCallParent(ccode)) return Parent::writeJson(jsonData); + + return ccode; +} +} // namespace engine::store::pythonBase diff --git a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py index 27befb45b..be2019392 100644 --- a/packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py +++ b/packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py @@ -24,7 +24,7 @@ from __future__ import annotations # Enables forward references from typing import TYPE_CHECKING, Dict, Any, List, TypedDict, Callable, Protocol -from .types import OPEN_MODE, ENDPOINT_MODE, SERVICE_MODE, Entry, IControl, IInvoke +from .types import OPEN_MODE, ENDPOINT_MODE, SERVICE_MODE, Entry, IControl, IInvoke, IJson from .error import APERR, Ec if TYPE_CHECKING: @@ -393,6 +393,10 @@ def sendTable(self, table: str) -> None: """Send a table structure.""" pass + def sendJson(self, data: IJson) -> None: + """Send a JSON object.""" + pass + def sendAudio(self, action: int, mimeType: str, buffer: bytes) -> None: """Send an audio buffer with the given action and MIME type.""" pass @@ -517,6 +521,10 @@ def writeTable(self, table: str) -> None: """Send a table structure.""" pass + def writeJson(self, data: IJson) -> None: + """Send a JSON object.""" + pass + def writeAudio(self, action: int, mimeType: str, buffer: bytes) -> None: """Send an audio buffer with the given action and MIME type.""" pass @@ -953,6 +961,10 @@ def writeTable(self, table: str) -> None: """Send a table structure.""" pass + def writeJson(self, data: IJson) -> None: + """Send a JSON object.""" + pass + def writeAudio(self, action: int, mimeType: str, buffer: bytes) -> None: """Send an audio buffer with the given action and MIME type.""" pass diff --git a/packages/server/engine-lib/test/store/tests/json.cpp b/packages/server/engine-lib/test/store/tests/json.cpp new file mode 100644 index 000000000..0f5f55dd8 --- /dev/null +++ b/packages/server/engine-lib/test/store/tests/json.cpp @@ -0,0 +1,124 @@ +// ============================================================================= +// MIT License +// Copyright (c) 2026 Aparavi Software AG +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// ============================================================================= + +#include "../store.h" + +/** + * Pipeline that routes the json lane into the dedicated Return JSON response + * node (response_json://), which writes each input lane into the object + * results. The json values land in response["json"] as an array (one entry per + * writeJson call), mirroring how the response node handles the table lane. + */ +class JsonFilterTest : public IFilterTest { +public: + JsonFilterTest() : IFilterTest({"response_json"}) {} + + /// Returns the array of json values the response node captured, or a null + /// value if the json lane was not written to the object results. + json::Value getJsonResults() const noexcept { + const auto &entry = getEntry(); + if (entry.response && entry.response().isObject()) { + auto value = entry.response().lookup("json"); + if (value.isArray()) return value; + } + return {}; + } + + /// Returns the first captured json value (the common single-write case). + json::Value getJson() const noexcept { + auto results = getJsonResults(); + if (results.isArray() && !results.empty()) return results[0]; + return {}; + } +}; + +TEST_CASE("store::json") { + //----------------------------------------------------------------- + // The response node must capture a json-lane write into the object + // results, preserving scalars, nested objects and arrays. + //----------------------------------------------------------------- + SECTION("response captures json object") { + JsonFilterTest filter; + + // Build and connect the endpoint + REQUIRE_NO_ERROR(filter.connect()); + + // Get a source pipe, open a dummy object on it + auto pipe = filter.openObject("test.json"_tv, Entry::FLAGS::INDEX); + REQUIRE_NO_ERROR(pipe); + + // Build a representative payload: scalars, a nested object and an array + json::Value data; + data["title"] = "hello"; + data["count"] = 42; + data["nested"]["label"] = "x"; + data["tags"].append("a"); + data["tags"].append("b"); + + // Send it down the json lane + REQUIRE_NO_ERROR(pipe->writeJson(data)); + + // Close the object so the response node finalizes the result + REQUIRE_NO_ERROR(filter.closeObject(*pipe)); + + // The response node must have captured the json lane into the object + // result. (Fails until the response node implements writeJson.) + auto captured = filter.getJson(); + REQUIRE(captured.isObject()); + REQUIRE(captured.isMember("title")); + REQUIRE(captured["title"].asString() == "hello"); + REQUIRE(captured["count"] == 42); + REQUIRE(captured.isMember("nested")); + REQUIRE(captured["nested"]["label"].asString() == "x"); + REQUIRE(captured["tags"].isArray()); + REQUIRE(captured["tags"].size() == 2); + } + + //----------------------------------------------------------------- + // Each json-lane write on an object must be captured as a separate + // entry in the response array. + //----------------------------------------------------------------- + SECTION("response captures multiple json writes") { + JsonFilterTest filter; + + REQUIRE_NO_ERROR(filter.connect()); + + auto pipe = filter.openObject("test.json"_tv, Entry::FLAGS::INDEX); + REQUIRE_NO_ERROR(pipe); + + for (int i = 0; i < 3; i++) { + json::Value data; + data["index"] = i; + REQUIRE_NO_ERROR(pipe->writeJson(data)); + } + + REQUIRE_NO_ERROR(filter.closeObject(*pipe)); + + // All three writes must be captured in order. + auto results = filter.getJsonResults(); + REQUIRE(results.isArray()); + REQUIRE(results.size() == 3); + REQUIRE(results[0]["index"] == 0); + REQUIRE(results[2]["index"] == 2); + } +}